class Upperkut::Strategies::ScheduledQueue

Public: Encapsulates methods required to build a Scheculed Queue Items are queued, but are only fetched at a specific point in time.

Constants

ZPOPBYRANGE

Attributes

options[R]

Public Class Methods

new(worker, options = {}) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 29
def initialize(worker, options = {})
  @options = options
  @redis_options = @options.fetch(:redis, {})
  @worker = worker

  @batch_size = @options.fetch(
    :batch_size,
    Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
  )
end

Public Instance Methods

ack(_items) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 74
def ack(_items); end
clear() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 70
def clear
  redis { |conn| conn.del(key) }
end
fetch_items() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 55
def fetch_items
  args = {
    value_from: '-inf'.freeze,
    value_to: Time.now.utc.to_f.to_s,
    limit: @batch_size
  }
  items = []

  redis do |conn|
    items = pop_values(conn, args)
  end

  decode_json_items(items)
end
metrics() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 80
def metrics
  {
    'latency' => latency,
    'size' => size
  }
end
nack(items) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 76
def nack(items)
  push_items(items)
end
process?() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 87
def process?
  buff_size = size('-inf', Time.now.utc.to_i)
  return true if fulfill_condition?(buff_size)

  false
end
push_items(items = []) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 40
def push_items(items = [])
  items = normalize_items(items)
  return false if items.empty?

  redis do |conn|
    items.each do |item|
      schedule_item = ensure_timestamp_attr(item)
      timestamp = schedule_item.body['timestamp']
      conn.zadd(key, timestamp, encode_json_items(schedule_item))
    end
  end

  true
end

Private Instance Methods

ensure_timestamp_attr(item) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 100
def ensure_timestamp_attr(item)
  return item if item.body.key?('timestamp')

  Item.new(
    id: item.id,
    body: item.body.merge('timestamp' => Time.now.utc.to_i),
    enqueued_at: item.enqueued_at
  )
end
fulfill_condition?(buff_size) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 117
def fulfill_condition?(buff_size)
  !buff_size.zero?
end
key() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 96
def key
  "upperkut:queued:#{to_underscore(@worker.name)}"
end
latency() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 127
def latency
  now = Time.now.utc
  timestamp = now.to_f

  item = redis do |conn|
    item = conn.zrangebyscore(key, '-inf', timestamp.to_s, limit: [0, 1]).first
    decode_json_items([item]).first
  end

  return timestamp - item.body['timestamp'].to_f if item

  0
end
pop_values(redis_client, args) click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 110
def pop_values(redis_client, args)
  value_from = args[:value_from]
  value_to = args[:value_to]
  limit = args[:limit]
  redis_client.eval(ZPOPBYRANGE, keys: [key], argv: [value_from, value_to, limit])
end
redis() { |conn| ... } click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 141
def redis
  raise ArgumentError, 'requires a block' unless block_given?

  retry_block do
    redis_pool.with do |conn|
      yield conn
    end
  end
end
redis_pool() click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 151
def redis_pool
  @redis_pool ||= begin
                    if @redis_options.is_a?(ConnectionPool)
                      @redis_options
                    else
                      RedisPool.new(@redis_options).create
                    end
                  end
end
size(min = '-inf', max = '+inf') click to toggle source
# File lib/upperkut/strategies/scheduled_queue.rb, line 121
def size(min = '-inf', max = '+inf')
  redis do |conn|
    conn.zcount(key, min, max)
  end
end