class Upperkut::Strategies::PriorityQueue
Public: Queue that prevent a single tenant from taking over.
Constants
- DEQUEUE_ITEM
Uses ZPOP* functions available only on redis 5.0.0+
- ENQUEUE_ITEM
Logic as follows:
We keep the last score used for each tenant key. One tenant_key is
an tenant unique id. To calculate the next_score we use max(current_tenant_score, current_global_score) + increment we store the queue in a sorted set using the next_score as ordering key if one tenant sends lots of messages, this tenant ends up with lots of messages in the queue spaced by increment if another tenant then sends a message, since it previous_tenant_score is lower than the first tenant, it will be inserted before it in the queue.
In other words, the idea of this queue is to not allowing an tenant
that sends a lot of messages to dominate processing and give a chance for tenants that sends few messages to have a fair share of processing time.
- ONE_DAY_IN_SECONDS
Public Class Methods
new(worker, options)
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 72 def initialize(worker, options) @worker = worker @options = options @priority_key = options.fetch(:priority_key) @redis_options = options.fetch(:redis, {}) @max_wait = options.fetch( :max_wait, Integer(ENV['UPPERKUT_MAX_WAIT'] || 20) ) @batch_size = options.fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) @waiting_time = 0 raise ArgumentError, 'Invalid priority_key. ' \ 'Must be a lambda' unless @priority_key.respond_to?(:call) end
Public Instance Methods
ack(_items)
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 142 def ack(_items); end
clear()
click to toggle source
Public: Clear all data related to the strategy.
# File lib/upperkut/strategies/priority_queue.rb, line 138 def clear redis { |conn| conn.del(queue_key) } end
fetch_items()
click to toggle source
Public: Retrieve events from Strategy.
Returns an Array containing events as hash.
# File lib/upperkut/strategies/priority_queue.rb, line 125 def fetch_items batch_size = [@batch_size, size].min items = redis do |conn| conn.eval(DEQUEUE_ITEM, keys: [checkpoint_key, queue_key], argv: [batch_size]) end decode_json_items(items) end
metrics()
click to toggle source
Public: Consolidated strategy metrics.
Returns hash containing metric name and values.
# File lib/upperkut/strategies/priority_queue.rb, line 164 def metrics { 'size' => size } end
nack(items)
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 144 def nack(items) push_items(items) end
process?()
click to toggle source
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
# File lib/upperkut/strategies/priority_queue.rb, line 151 def process? if fulfill_condition?(size) @waiting_time = 0 return true end @waiting_time += @worker.setup.polling_interval false end
push_items(items = [])
click to toggle source
Public: Ingests the event into strategy.
items - The Array of items do be inserted.
Returns true when success, raise when error.
# File lib/upperkut/strategies/priority_queue.rb, line 99 def push_items(items = []) items = normalize_items(items) return false if items.empty? redis do |conn| items.each do |item| priority_key = @priority_key.call(item) score_key = "#{queue_key}:#{priority_key}:score" keys = [checkpoint_key, counter_key, score_key, queue_key] conn.eval(ENQUEUE_ITEM, keys: keys, argv: [encode_json_items(item)]) end end true end
Private Instance Methods
checkpoint_key()
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 172 def checkpoint_key "#{queue_key}:checkpoint" end
counter_key()
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 176 def counter_key "#{queue_key}:counter" end
fulfill_condition?(buff_size)
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 184 def fulfill_condition?(buff_size) return false if buff_size.zero? buff_size >= @batch_size || @waiting_time >= @max_wait end
queue_key()
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 180 def queue_key "upperkut:priority_queue:#{to_underscore(@worker.name)}" end
redis() { |conn| ... }
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 196 def redis raise ArgumentError, 'requires a block' unless block_given? retry_block do redis_pool.with do |conn| yield conn end end end
redis_pool()
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 206 def redis_pool @redis_pool ||= begin if @redis_options.is_a?(ConnectionPool) @redis_options else RedisPool.new(@options.fetch(:redis, {})).create end end end
size()
click to toggle source
# File lib/upperkut/strategies/priority_queue.rb, line 190 def size redis do |conn| conn.zcard(queue_key) end end