class Upperkut::Strategies::ScheduledQueue
Public: Encapsulates methods required to build a Scheculed Queue Items are queued, but are only fetched at a specific point in time.
Constants
- ZPOPBYRANGE
Attributes
options[R]
Public Class Methods
new(worker, options = {})
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 29 def initialize(worker, options = {}) @options = options @redis_options = @options.fetch(:redis, {}) @worker = worker @batch_size = @options.fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) end
Public Instance Methods
ack(_items)
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 74 def ack(_items); end
clear()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 70 def clear redis { |conn| conn.del(key) } end
fetch_items()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 55 def fetch_items args = { value_from: '-inf'.freeze, value_to: Time.now.utc.to_f.to_s, limit: @batch_size } items = [] redis do |conn| items = pop_values(conn, args) end decode_json_items(items) end
metrics()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 80 def metrics { 'latency' => latency, 'size' => size } end
nack(items)
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 76 def nack(items) push_items(items) end
process?()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 87 def process? buff_size = size('-inf', Time.now.utc.to_i) return true if fulfill_condition?(buff_size) false end
push_items(items = [])
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 40 def push_items(items = []) items = normalize_items(items) return false if items.empty? redis do |conn| items.each do |item| schedule_item = ensure_timestamp_attr(item) timestamp = schedule_item.body['timestamp'] conn.zadd(key, timestamp, encode_json_items(schedule_item)) end end true end
Private Instance Methods
ensure_timestamp_attr(item)
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 100 def ensure_timestamp_attr(item) return item if item.body.key?('timestamp') Item.new( id: item.id, body: item.body.merge('timestamp' => Time.now.utc.to_i), enqueued_at: item.enqueued_at ) end
fulfill_condition?(buff_size)
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 117 def fulfill_condition?(buff_size) !buff_size.zero? end
key()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 96 def key "upperkut:queued:#{to_underscore(@worker.name)}" end
latency()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 127 def latency now = Time.now.utc timestamp = now.to_f item = redis do |conn| item = conn.zrangebyscore(key, '-inf', timestamp.to_s, limit: [0, 1]).first decode_json_items([item]).first end return timestamp - item.body['timestamp'].to_f if item 0 end
pop_values(redis_client, args)
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 110 def pop_values(redis_client, args) value_from = args[:value_from] value_to = args[:value_to] limit = args[:limit] redis_client.eval(ZPOPBYRANGE, keys: [key], argv: [value_from, value_to, limit]) end
redis() { |conn| ... }
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 141 def redis raise ArgumentError, 'requires a block' unless block_given? retry_block do redis_pool.with do |conn| yield conn end end end
redis_pool()
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 151 def redis_pool @redis_pool ||= begin if @redis_options.is_a?(ConnectionPool) @redis_options else RedisPool.new(@redis_options).create end end end
size(min = '-inf', max = '+inf')
click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 121 def size(min = '-inf', max = '+inf') redis do |conn| conn.zcount(key, min, max) end end