class Upperkut::Strategies::PriorityQueue

Public: Queue that prevent a single tenant from taking over.

Constants

DEQUEUE_ITEM

Uses ZPOP* functions available only on redis 5.0.0+

ENQUEUE_ITEM

Logic as follows:

We keep the last score used for each tenant key. One tenant_key is

an tenant unique id. To calculate the next_score we use
max(current_tenant_score, current_global_score) + increment we store
the queue in a sorted set using the next_score as ordering key if one
tenant sends lots of messages, this tenant ends up with lots of
messages in the queue spaced by increment if another tenant then
sends a message, since it previous_tenant_score is lower than the
first tenant, it will be inserted before it in the queue.

In other words, the idea of this queue is to not allowing an tenant

that sends a lot of messages to dominate processing and give a chance
for tenants that sends few messages to have a fair share of
processing time.
ONE_DAY_IN_SECONDS

Public Class Methods

new(worker, options) click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 72
def initialize(worker, options)
  @worker = worker
  @options = options
  @priority_key = options.fetch(:priority_key)
  @redis_options = options.fetch(:redis, {})

  @max_wait = options.fetch(
    :max_wait,
    Integer(ENV['UPPERKUT_MAX_WAIT'] || 20)
  )

  @batch_size = options.fetch(
    :batch_size,
    Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
  )

  @waiting_time = 0

  raise ArgumentError, 'Invalid priority_key. ' \
    'Must be a lambda' unless @priority_key.respond_to?(:call)
end

Public Instance Methods

ack(_items) click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 142
def ack(_items); end
clear() click to toggle source

Public: Clear all data related to the strategy.

# File lib/upperkut/strategies/priority_queue.rb, line 138
def clear
  redis { |conn| conn.del(queue_key) }
end
fetch_items() click to toggle source

Public: Retrieve events from Strategy.

Returns an Array containing events as hash.

# File lib/upperkut/strategies/priority_queue.rb, line 125
def fetch_items
  batch_size = [@batch_size, size].min

  items = redis do |conn|
    conn.eval(DEQUEUE_ITEM,
              keys: [checkpoint_key, queue_key],
              argv: [batch_size])
  end

  decode_json_items(items)
end
metrics() click to toggle source

Public: Consolidated strategy metrics.

Returns hash containing metric name and values.

# File lib/upperkut/strategies/priority_queue.rb, line 164
def metrics
  {
    'size' => size
  }
end
nack(items) click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 144
def nack(items)
  push_items(items)
end
process?() click to toggle source

Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.

# File lib/upperkut/strategies/priority_queue.rb, line 151
def process?
  if fulfill_condition?(size)
    @waiting_time = 0
    return true
  end

  @waiting_time += @worker.setup.polling_interval
  false
end
push_items(items = []) click to toggle source

Public: Ingests the event into strategy.

items - The Array of items do be inserted.

Returns true when success, raise when error.

# File lib/upperkut/strategies/priority_queue.rb, line 99
def push_items(items = [])
  items = normalize_items(items)
  return false if items.empty?

  redis do |conn|
    items.each do |item|
      priority_key = @priority_key.call(item)
      score_key = "#{queue_key}:#{priority_key}:score"

      keys = [checkpoint_key,
              counter_key,
              score_key,
              queue_key]

      conn.eval(ENQUEUE_ITEM,
                keys: keys,
                argv: [encode_json_items(item)])
    end
  end

  true
end

Private Instance Methods

checkpoint_key() click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 172
def checkpoint_key
  "#{queue_key}:checkpoint"
end
counter_key() click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 176
def counter_key
  "#{queue_key}:counter"
end
fulfill_condition?(buff_size) click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 184
def fulfill_condition?(buff_size)
  return false if buff_size.zero?

  buff_size >= @batch_size || @waiting_time >= @max_wait
end
queue_key() click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 180
def queue_key
  "upperkut:priority_queue:#{to_underscore(@worker.name)}"
end
redis() { |conn| ... } click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 196
def redis
  raise ArgumentError, 'requires a block' unless block_given?

  retry_block do
    redis_pool.with do |conn|
      yield conn
    end
  end
end
redis_pool() click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 206
def redis_pool
  @redis_pool ||= begin
                    if @redis_options.is_a?(ConnectionPool)
                      @redis_options
                    else
                      RedisPool.new(@options.fetch(:redis, {})).create
                    end
                  end
end
size() click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 190
def size
  redis do |conn|
    conn.zcard(queue_key)
  end
end