class ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue

Attributes

list_key[R]
redis_pool[R]

Public Class Methods

new(redis_connection_pool, new_list_key) click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 7
def initialize(redis_connection_pool, new_list_key)
  @redis_pool = redis_connection_pool
  @list_key = new_list_key
end

Public Instance Methods

<<(message) click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 12
def <<(message)
  encoded_message = ::Marshal.dump(message)

  redis_pool.with do |redis|
    redis.rpush(list_key, encoded_message)
  end
end
concat(*messages) click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 20
def concat(*messages)
  messages = messages.flatten
  messages.compact!
  return if messages.empty?

  encoded_messages = []
  messages.each do |message|
    encoded_messages << ::Marshal.dump(message)
  end

  redis_pool.with do |redis|
    redis.rpush(list_key, encoded_messages)
  end
end
empty?() click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 35
def empty?
  size <= 0
end
pop_up_to(num_to_pop, opts = {}) click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 39
def pop_up_to(num_to_pop, opts = {})
  case opts
  when TrueClass, FalseClass
    non_bock = opts
  when Hash
    timeout = opts.fetch(:timeout, nil)
    non_block = opts.fetch(:non_block, false)
  end

  queue_size = size
  if queue_size <= 0
    if non_block
      raise ThreadError, "queue empty"
    else
      total_waited_time = 0

      loop do
        total_waited_time += 0.1
        sleep 0.1
        queue_size = size

        if queue_size > 0
          num_to_pop = [num_to_pop, queue_size].min # make sure we don't pop more than size
          return shift(num_to_pop)
        end

        return if timeout && total_waited_time > timeout
      end
    end
  else
    shift(num_to_pop)
  end
end
shift(number) click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 73
def shift(number)
  number = [number, size].min
  return [] if number <= 0

  messages = []
  multi_response = []
  redis_pool.with do |redis|
    multi_response = redis.multi do |pipeline|
      pipeline.lrange(list_key, 0, number - 1)
      pipeline.ltrim(list_key, number, -1)
    end
  end

  messages = multi_response.first
  success = multi_response.last
  return [] if multi_response.size != 2 || success.nil?
  return [] unless success =~ /ok/i

  messages = [] if messages.nil?
  messages = [messages] unless messages.respond_to?(:each)

  shifted_messages = []
  messages.each do |message|
    next if message.nil?

    shifted_messages << ::Marshal.load(message)
  end

  shifted_messages
end
size() click to toggle source
# File lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb, line 104
def size
  redis_pool.with do |redis|
    redis.llen(list_key) || 0
  end
end