class Bernstein::RedisQueue
Constants
- QUEUE_SET
Public Class Methods
add(message)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 16 def self.add(message) @redis.multi do @redis.sadd QUEUE_SET, message.id @redis.setex message.id, @options[:key_expiry], message.serialize @redis.setex status_key(message.id), @options[:key_expiry], STATES[:queued] end end
clear()
click to toggle source
# File lib/bernstein/redis_queue.rb, line 43 def self.clear @redis.del QUEUE_SET end
configure!(options = {})
click to toggle source
# File lib/bernstein/redis_queue.rb, line 11 def self.configure!(options = {}) @options.merge!(options || {}) @redis = Redis::Namespace.new(:bernstein, :redis => Redis.new(@options[:redis])) end
dequeue(id, mark_as_sent = false)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 47 def self.dequeue(id, mark_as_sent = false) remove_and_change_status(id, (mark_as_sent ? STATES[:sent] : STATES[:sending])) end
mark_as_sent(id)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 51 def self.mark_as_sent(id) set_status(id, STATES[:sent]) end
queued_messages()
click to toggle source
# File lib/bernstein/redis_queue.rb, line 28 def self.queued_messages queued_message_ids = @redis.smembers QUEUE_SET messages = [] unless queued_message_ids.empty? messages = @redis.mget(queued_message_ids).compact unless messages.empty? messages.map!{|m| Message.deserialize(m)} end if messages.size < queued_message_ids.size clean_up_queue(queued_message_ids - messages.map{|m| m.id}) end end messages end
status(id)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 24 def self.status(id) @redis.get(status_key(id)) || STATES[:not_yet_queued] end
Private Class Methods
clean_up_queue(ids_to_remove)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 80 def self.clean_up_queue(ids_to_remove) @redis.pipelined do ids_to_remove.each{|id| @redis.srem QUEUE_SET, id} end end
remove(id) { || ... }
click to toggle source
# File lib/bernstein/redis_queue.rb, line 68 def self.remove(id) if block_given? @redis.multi do @redis.srem QUEUE_SET, id yield end else @redis.srem QUEUE_SET, id end @redis.del id end
remove_and_change_status(id, status)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 60 def self.remove_and_change_status(id, status) remove(id){ set_status(id, status) } end
set_status(id, status)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 64 def self.set_status(id, status) @redis.setex status_key(id), @options[:key_expiry], status end
status_key(id)
click to toggle source
# File lib/bernstein/redis_queue.rb, line 56 def self.status_key(id) "#{id}_status" end