class Prop::LeakyBucketStrategy

Public Class Methods

_throttle_leaky_bucket(handle, key, cache_key, options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 8
def _throttle_leaky_bucket(handle, key, cache_key, options)
  (over_limit, bucket) = options.key?(:decrement) ?
    decrement(cache_key, options.fetch(:decrement), options) :
    increment(cache_key, options.fetch(:increment, 1), options)

  [over_limit, bucket]
end
build(options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 73
def build(options)
  key       = options.fetch(:key)
  handle    = options.fetch(:handle)

  cache_key = Prop::Key.normalize([ handle, key ])

  "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}"
end
cache() click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 105
def cache
  Prop::Limiter.cache
end
compare_threshold?(bucket, operator, options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 69
def compare_threshold?(bucket, operator, options)
  bucket.fetch(:over_limit, false)
end
counter(cache_key, options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 16
def counter(cache_key, options)
  cache.read(cache_key) || zero_counter
end
decrement(cache_key, amount, options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 54
def decrement(cache_key, amount, options)
  now = Time.now.to_i
  bucket = counter(cache_key, options)
  leak_amount = leak_amount(bucket, amount, options, now)
  bucket[:bucket] = [bucket[:bucket] - amount - leak_amount, 0].max
  bucket[:last_leak_time] = now if leak_amount > 0
  bucket[:over_limit] = false
  cache.write(cache_key, bucket)
  [false, bucket]
end
increment(cache_key, amount, options) click to toggle source

WARNING: race condition this increment is not atomic, so it might miss counts when used frequently

# File lib/prop/leaky_bucket_strategy.rb, line 34
def increment(cache_key, amount, options)
  bucket = counter(cache_key, options)
  now = Time.now.to_i
  max_bucket_size = options.fetch(:burst_rate)
  current_bucket_size = bucket.fetch(:bucket, 0)
  leak_amount = leak_amount(bucket, amount, options, now)
  if leak_amount > 0
    # maybe TODO, update last_leak_time to reflect the exact time for the current leak amount
    # the current strategy will always reflect a little less leakage, probably not an issue though
    bucket[:last_leak_time] = now
    current_bucket_size = [(current_bucket_size - leak_amount), 0].max
  end

  over_limit, updated_bucket_size = update_bucket(current_bucket_size, max_bucket_size, amount)
  bucket[:bucket] = updated_bucket_size
  bucket[:over_limit] = over_limit
  cache.write(cache_key, bucket)
  [over_limit, bucket]
end
leak_amount(bucket, amount, options, now) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 20
def leak_amount(bucket, amount, options, now)
  leak_rate = (now - bucket.fetch(:last_leak_time, 0)) / options.fetch(:interval).to_f
  leak_amount = (leak_rate * options.fetch(:threshold).to_f)
  leak_amount.to_i
end
reset(cache_key, options = {}) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 65
def reset(cache_key, options = {})
  cache.write(cache_key, zero_counter, raw: true)
end
threshold_reached(options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 82
def threshold_reached(options)
  burst_rate = options.fetch(:burst_rate)
  threshold  = options.fetch(:threshold)

  "#{options[:handle]} threshold of #{threshold} tries per #{options[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{options[:key].inspect}, hash #{options[:cache_key]}"
end
update_bucket(current_bucket_size, max_bucket_size, amount) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 26
def update_bucket(current_bucket_size, max_bucket_size, amount)
  over_limit = (max_bucket_size-current_bucket_size) < amount
  updated_bucket_size = over_limit ? current_bucket_size : current_bucket_size + amount
  [over_limit, updated_bucket_size]
end
validate_options!(options) click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 89
def validate_options!(options)
  Prop::IntervalStrategy.validate_options!(options)

  if !options[:burst_rate].is_a?(Integer) || options[:burst_rate] < options[:threshold]
    raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold")
  end

  if options[:first_throttled]
    raise ArgumentError.new(":first_throttled is not supported")
  end
end
zero_counter() click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 101
def zero_counter
  { bucket: 0, last_leak_time: 0, over_limit: false }
end