class Upperkut::Strategies::BufferedQueue

Constants

ACK_ITEMS
DEQUEUE_ITEMS
NACK_ITEMS

Attributes

options[R]

Public Class Methods

new(worker, options = {}) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 58
def initialize(worker, options = {})
  @options = options
  @redis_options = options.fetch(:redis, {})
  @worker = worker

  @ack_wait_limit = options.fetch(
    :ack_wait_limit,
    Integer(ENV['UPPERKUT_ACK_WAIT_LIMIT'] || 120)
  )

  @max_wait = options.fetch(
    :max_wait,
    Integer(ENV['UPPERKUT_MAX_WAIT'] || 20)
  )

  @batch_size = options.fetch(
    :batch_size,
    Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
  )

  @waiting_time = 0
end

Public Instance Methods

ack(items) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 108
def ack(items)
  raise ArgumentError, 'Invalid item' unless items.all? { |item| item.is_a?(Item) }

  redis do |conn|
    conn.eval(ACK_ITEMS,
              keys: [processing_key],
              argv: encode_json_items(items))
  end
end
clear() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 104
def clear
  redis { |conn| conn.del(key) }
end
fetch_items() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 92
def fetch_items
  batch_size = [@batch_size, size].min

  items = redis do |conn|
    conn.eval(DEQUEUE_ITEMS,
              keys: [key, processing_key],
              argv: [batch_size, Time.now.utc.to_i, Time.now.utc.to_i - @ack_wait_limit])
  end

  decode_json_items(items)
end
metrics() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 140
def metrics
  current_latency = latency

  {
    'latency' => current_latency,
    'oldest_unacked_item_age' => oldest_item_age(current_latency),
    'size' => size
  }
end
nack(items) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 118
def nack(items)
  raise ArgumentError, 'Invalid item' unless items.all? { |item| item.is_a?(Item) }

  redis do |conn|
    conn.eval(NACK_ITEMS,
              keys: [key, processing_key],
              argv: encode_json_items(items))
  end
end
process?() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 128
def process?
  buff_size = size

  if fulfill_condition?(buff_size)
    @waiting_time = 0
    return true
  else
    @waiting_time += @worker.setup.polling_interval
    return false
  end
end
push_items(items = []) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 81
def push_items(items = [])
  items = normalize_items(items)
  return false if items.empty?

  redis do |conn|
    conn.rpush(key, encode_json_items(items))
  end

  true
end

Private Instance Methods

fulfill_condition?(buff_size) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 160
def fulfill_condition?(buff_size)
  return false if buff_size.zero?

  buff_size >= @batch_size || @waiting_time >= @max_wait
end
key() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 152
def key
  "upperkut:buffers:#{to_underscore(@worker.name)}"
end
latency() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 182
def latency
  items = redis { |conn| conn.lrange(key, 0, 0) }
  first_item = decode_json_items(items).first
  return 0 unless first_item

  now = Time.now.to_f
  now - first_item.enqueued_at.to_f
end
oldest_item_age(current_latency) click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 166
def oldest_item_age(current_latency)
  oldest_processing_item = redis do |conn|
    items = conn.zrange(processing_key, 0, 0)
    decode_json_items(items).first
  end

  oldest_processing_age = if oldest_processing_item
                            now = Time.now.to_f
                            now - oldest_processing_item.enqueued_at.to_f
                          else
                            0
                          end

  [current_latency, oldest_processing_age].max
end
processing_key() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 156
def processing_key
  "#{key}:processing"
end
redis() { |conn| ... } click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 197
def redis
  raise ArgumentError, 'requires a block' unless block_given?

  retry_block do
    redis_pool.with do |conn|
      yield conn
    end
  end
end
redis_pool() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 207
def redis_pool
  @redis_pool ||= begin
                    if @redis_options.is_a?(ConnectionPool)
                      @redis_options
                    else
                      RedisPool.new(@redis_options).create
                    end
                  end
end
size() click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 191
def size
  redis do |conn|
    conn.llen(key)
  end
end