class RefillingQueue

Constants

DEFAULT_OPTIONS
VERSION

Public Class Methods

new(client, name, options={}, &block) click to toggle source
# File lib/refilling_queue.rb, line 13
def initialize(client, name, options={}, &block)
  @client, @name, @block = client, name, block
  @page_name = name + "/page"
  @options = DEFAULT_OPTIONS.merge(options)
  raise "Invalid keys" if (options.keys - DEFAULT_OPTIONS.keys).any?
end

Public Instance Methods

clear() click to toggle source
# File lib/refilling_queue.rb, line 27
def clear
  lock do
    mark_as_empty
    @client.del @name
  end
end
pop() click to toggle source
# File lib/refilling_queue.rb, line 20
def pop
  item = _pop
  return item unless item.nil?
  refill
  _pop
end

Private Instance Methods

_pop() click to toggle source
# File lib/refilling_queue.rb, line 40
def _pop
  @client.lpop @name
end
_refill() click to toggle source
# File lib/refilling_queue.rb, line 50
def _refill
  results = if paginate?
    page = (@client.get(@page_name) || 0).to_i
    @block.call(page + 1)
  else
    @block.call
  end
  if results.empty?
    mark_as_empty
    return
  end

  @client.pipelined do
    @client.del @name
    @client.rpush @name, results
    @client.expire @name, @options[:refresh_every] if @options[:refresh_every]
    if paginate?
      @client.incr @page_name
      @client.expire @page_name, @options[:refresh_every] if @options[:refresh_every]
    end
  end
end
empty?() click to toggle source
# File lib/refilling_queue.rb, line 73
def empty?
  @client.llen(@name) == 0
end
lock() { || ... } click to toggle source
# File lib/refilling_queue.rb, line 81
def lock
  lock = "#{@name}_lock"

  # transaction: prevent infinite lock when process dies immediately after setting lock
  acquired, _ = @client.multi do
    @client.setnx lock, "1"
    @client.expire lock, @options[:lock_timeout]
  end
  raise RefillingQueue::Locked unless [true, 1].include?(acquired)

  begin
    yield
  ensure
    @client.del lock, "1"
  end
end
mark_as_empty() click to toggle source
# File lib/refilling_queue.rb, line 77
def mark_as_empty
  @client.del @page_name if paginate?
end
paginate?() click to toggle source
# File lib/refilling_queue.rb, line 36
def paginate?
  @options[:paginate]
end
refill() click to toggle source
# File lib/refilling_queue.rb, line 44
def refill
  lock do
    _refill
  end
end