class Bernstein::RedisQueue

Constants

QUEUE_SET

Public Class Methods

add(message) click to toggle source
# File lib/bernstein/redis_queue.rb, line 16
def self.add(message)
  @redis.multi do
    @redis.sadd QUEUE_SET, message.id
    @redis.setex message.id, @options[:key_expiry], message.serialize
    @redis.setex status_key(message.id), @options[:key_expiry], STATES[:queued]
  end
end
clear() click to toggle source
# File lib/bernstein/redis_queue.rb, line 43
def self.clear
  @redis.del QUEUE_SET
end
configure!(options = {}) click to toggle source
# File lib/bernstein/redis_queue.rb, line 11
def self.configure!(options = {})
  @options.merge!(options || {})
  @redis = Redis::Namespace.new(:bernstein, :redis => Redis.new(@options[:redis]))
end
dequeue(id, mark_as_sent = false) click to toggle source
# File lib/bernstein/redis_queue.rb, line 47
def self.dequeue(id, mark_as_sent = false)
  remove_and_change_status(id, (mark_as_sent ? STATES[:sent] : STATES[:sending]))
end
mark_as_sent(id) click to toggle source
# File lib/bernstein/redis_queue.rb, line 51
def self.mark_as_sent(id)
  set_status(id, STATES[:sent])
end
queued_messages() click to toggle source
# File lib/bernstein/redis_queue.rb, line 28
def self.queued_messages
  queued_message_ids = @redis.smembers QUEUE_SET
  messages = []
  unless queued_message_ids.empty?
    messages = @redis.mget(queued_message_ids).compact
    unless messages.empty?
      messages.map!{|m| Message.deserialize(m)} 
    end
    if messages.size < queued_message_ids.size
      clean_up_queue(queued_message_ids - messages.map{|m| m.id})
    end
  end
  messages
end
status(id) click to toggle source
# File lib/bernstein/redis_queue.rb, line 24
def self.status(id)
  @redis.get(status_key(id)) || STATES[:not_yet_queued]
end

Private Class Methods

clean_up_queue(ids_to_remove) click to toggle source
# File lib/bernstein/redis_queue.rb, line 80
def self.clean_up_queue(ids_to_remove)
  @redis.pipelined do
    ids_to_remove.each{|id| @redis.srem QUEUE_SET, id}
  end
end
remove(id) { || ... } click to toggle source
# File lib/bernstein/redis_queue.rb, line 68
def self.remove(id)
  if block_given?
    @redis.multi do
      @redis.srem QUEUE_SET, id
      yield
    end
  else
    @redis.srem QUEUE_SET, id
  end
  @redis.del id
end
remove_and_change_status(id, status) click to toggle source
# File lib/bernstein/redis_queue.rb, line 60
def self.remove_and_change_status(id, status)
  remove(id){ set_status(id, status) }
end
set_status(id, status) click to toggle source
# File lib/bernstein/redis_queue.rb, line 64
def self.set_status(id, status)
  @redis.setex status_key(id), @options[:key_expiry], status
end
status_key(id) click to toggle source
# File lib/bernstein/redis_queue.rb, line 56
def self.status_key(id)
  "#{id}_status"
end