class Upperkut::Strategies::BufferedQueue
Constants
- ACK_ITEMS
- DEQUEUE_ITEMS
- NACK_ITEMS
Attributes
options[R]
Public Class Methods
new(worker, options = {})
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 58 def initialize(worker, options = {}) @options = options @redis_options = options.fetch(:redis, {}) @worker = worker @ack_wait_limit = options.fetch( :ack_wait_limit, Integer(ENV['UPPERKUT_ACK_WAIT_LIMIT'] || 120) ) @max_wait = options.fetch( :max_wait, Integer(ENV['UPPERKUT_MAX_WAIT'] || 20) ) @batch_size = options.fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) @waiting_time = 0 end
Public Instance Methods
ack(items)
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 108 def ack(items) raise ArgumentError, 'Invalid item' unless items.all? { |item| item.is_a?(Item) } redis do |conn| conn.eval(ACK_ITEMS, keys: [processing_key], argv: encode_json_items(items)) end end
clear()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 104 def clear redis { |conn| conn.del(key) } end
fetch_items()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 92 def fetch_items batch_size = [@batch_size, size].min items = redis do |conn| conn.eval(DEQUEUE_ITEMS, keys: [key, processing_key], argv: [batch_size, Time.now.utc.to_i, Time.now.utc.to_i - @ack_wait_limit]) end decode_json_items(items) end
metrics()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 140 def metrics current_latency = latency { 'latency' => current_latency, 'oldest_unacked_item_age' => oldest_item_age(current_latency), 'size' => size } end
nack(items)
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 118 def nack(items) raise ArgumentError, 'Invalid item' unless items.all? { |item| item.is_a?(Item) } redis do |conn| conn.eval(NACK_ITEMS, keys: [key, processing_key], argv: encode_json_items(items)) end end
process?()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 128 def process? buff_size = size if fulfill_condition?(buff_size) @waiting_time = 0 return true else @waiting_time += @worker.setup.polling_interval return false end end
push_items(items = [])
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 81 def push_items(items = []) items = normalize_items(items) return false if items.empty? redis do |conn| conn.rpush(key, encode_json_items(items)) end true end
Private Instance Methods
fulfill_condition?(buff_size)
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 160 def fulfill_condition?(buff_size) return false if buff_size.zero? buff_size >= @batch_size || @waiting_time >= @max_wait end
key()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 152 def key "upperkut:buffers:#{to_underscore(@worker.name)}" end
latency()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 182 def latency items = redis { |conn| conn.lrange(key, 0, 0) } first_item = decode_json_items(items).first return 0 unless first_item now = Time.now.to_f now - first_item.enqueued_at.to_f end
oldest_item_age(current_latency)
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 166 def oldest_item_age(current_latency) oldest_processing_item = redis do |conn| items = conn.zrange(processing_key, 0, 0) decode_json_items(items).first end oldest_processing_age = if oldest_processing_item now = Time.now.to_f now - oldest_processing_item.enqueued_at.to_f else 0 end [current_latency, oldest_processing_age].max end
processing_key()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 156 def processing_key "#{key}:processing" end
redis() { |conn| ... }
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 197 def redis raise ArgumentError, 'requires a block' unless block_given? retry_block do redis_pool.with do |conn| yield conn end end end
redis_pool()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 207 def redis_pool @redis_pool ||= begin if @redis_options.is_a?(ConnectionPool) @redis_options else RedisPool.new(@redis_options).create end end end
size()
click to toggle source
# File lib/upperkut/strategies/buffered_queue.rb, line 191 def size redis do |conn| conn.llen(key) end end