class CircularQueue

A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if `enq!` is used).

Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.

Exposes the same interface as the `Queue` from the Ruby stdlib.

Example:

# Capacity of 3
q = CircularQueue.new(3)

q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]

# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]

Attributes

capacity[R]

Returns the maximum number of elements that can be enqueued @return [Integer]

length[R]

Returns the number of elements in the queue @return [Integer]

size[R]

Returns the number of elements in the queue @return [Integer]

Public Class Methods

new(capacity) click to toggle source

Creates a new queue of the specified capacity @param [Integer] capacity the maximum capacity of the queue

# File lib/circular_queue.rb, line 34
def initialize(capacity)
  @capacity = capacity
  @data     = Array.new(capacity)

  @mutex    = Mutex.new
  @waiting  = []

  clear
end

Public Instance Methods

<<(item)
Alias for: enq
back() click to toggle source

Returns the last/most recent item in the queue @return [Object] Peek at last item without removing

# File lib/circular_queue.rb, line 128
def back
  @mutex.synchronize do
    @data[(@back - 1) % @capacity]
  end
end
clear() click to toggle source

Removes all items from the queue @return [CircularQueue] the queue itself

# File lib/circular_queue.rb, line 95
def clear
  @mutex.synchronize do
    @size  = 0
    @front = 0
    @back  = 0
    self
  end
end
data() click to toggle source

Returns the data in the queue @return [Array] the queue Allows for easy iteration of queue from front to back

# File lib/circular_queue.rb, line 143
def data
  @mutex.synchronize do
    @data.clone.tap do |data|
      data.rotate!(@front)
      data.slice!(@size..-1)
    end
  end
end
deq(non_block = false) click to toggle source

Removes an item from the queue @param [Boolean] non_block true to raise an error if the queue is empty;

otherwise, waits for an item to arrive from another thread

@raise [ThreadError] non_block was true and the queue was empty

# File lib/circular_queue.rb, line 76
def deq(non_block = false)
  @mutex.synchronize do
    loop do
      if empty?
        raise ThreadError.new("Queue is empty") if non_block

        @waiting.push(Thread.current) unless @waiting.include?(Thread.current)
        @mutex.sleep
      else
        return deq_item
      end
    end
  end
end
Also aliased as: shift, pop
empty?() click to toggle source

Returns whether the queue is empty @return [Boolean] queue is empty

# File lib/circular_queue.rb, line 106
def empty?
  @size == 0
end
enq(item) click to toggle source

Adds an item to the queue @param [Object] item item to add @return [CircularQueue] the queue itself

# File lib/circular_queue.rb, line 47
def enq(item)
  @mutex.synchronize do
    enq_item(item)
    wakeup_next_waiter
    self
  end
end
Also aliased as: <<, push
enq!(item) click to toggle source

Adds an item to the queue, raising an error if the queue is full @param [Object] item item to add @raise [ThreadError] queue is full @return [CircularQueue] the queue itself

# File lib/circular_queue.rb, line 61
def enq!(item)
  @mutex.synchronize do
    raise ThreadError.new("Queue is full") if full?

    enq_item(item)
    wakeup_next_waiter
    self
  end
end
Also aliased as: push!
front() click to toggle source

Returns thee first/oldest item in the queue @return [Object] Peek at first item without removing

# File lib/circular_queue.rb, line 119
def front
  @mutex.synchronize do
    @data[@front]
  end
end
full?() click to toggle source

Returns whether the queue is full @return [Boolean] queue is full

# File lib/circular_queue.rb, line 112
def full?
  @size == @capacity
end
num_waiting() click to toggle source

Returns the number of threads waiting for items to arrive in the queue @return [Integer] number of threads waiting

# File lib/circular_queue.rb, line 136
def num_waiting
  @waiting.length
end
pop(non_block = false)
Alias for: deq
push(item)
Alias for: enq
push!(item)
Alias for: enq!
shift(non_block = false)
Alias for: deq

Private Instance Methods

deq_item() click to toggle source
# File lib/circular_queue.rb, line 168
def deq_item
  item = @data[@front]

  @size  -= 1

  @front += 1
  @front %= @capacity

  item
end
enq_item(item) click to toggle source
# File lib/circular_queue.rb, line 154
def enq_item(item)
  @data[@back] = item

  if full?
    @front += 1
    @front %= @capacity
  else
    @size += 1
  end

  @back += 1
  @back %= @capacity
end
wakeup_next_waiter() click to toggle source
# File lib/circular_queue.rb, line 179
def wakeup_next_waiter
  if thread = @waiting.shift
    thread.wakeup
  end
rescue ThreadError
  retry
end