class Prop::LeakyBucketStrategy
Public Class Methods
_throttle_leaky_bucket(handle, key, cache_key, options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 8 def _throttle_leaky_bucket(handle, key, cache_key, options) (over_limit, bucket) = options.key?(:decrement) ? decrement(cache_key, options.fetch(:decrement), options) : increment(cache_key, options.fetch(:increment, 1), options) [over_limit, bucket] end
build(options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 73 def build(options) key = options.fetch(:key) handle = options.fetch(:handle) cache_key = Prop::Key.normalize([ handle, key ]) "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}" end
cache()
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 105 def cache Prop::Limiter.cache end
compare_threshold?(bucket, operator, options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 69 def compare_threshold?(bucket, operator, options) bucket.fetch(:over_limit, false) end
counter(cache_key, options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 16 def counter(cache_key, options) cache.read(cache_key) || zero_counter end
decrement(cache_key, amount, options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 54 def decrement(cache_key, amount, options) now = Time.now.to_i bucket = counter(cache_key, options) leak_amount = leak_amount(bucket, amount, options, now) bucket[:bucket] = [bucket[:bucket] - amount - leak_amount, 0].max bucket[:last_leak_time] = now if leak_amount > 0 bucket[:over_limit] = false cache.write(cache_key, bucket) [false, bucket] end
increment(cache_key, amount, options)
click to toggle source
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently
# File lib/prop/leaky_bucket_strategy.rb, line 34 def increment(cache_key, amount, options) bucket = counter(cache_key, options) now = Time.now.to_i max_bucket_size = options.fetch(:burst_rate) current_bucket_size = bucket.fetch(:bucket, 0) leak_amount = leak_amount(bucket, amount, options, now) if leak_amount > 0 # maybe TODO, update last_leak_time to reflect the exact time for the current leak amount # the current strategy will always reflect a little less leakage, probably not an issue though bucket[:last_leak_time] = now current_bucket_size = [(current_bucket_size - leak_amount), 0].max end over_limit, updated_bucket_size = update_bucket(current_bucket_size, max_bucket_size, amount) bucket[:bucket] = updated_bucket_size bucket[:over_limit] = over_limit cache.write(cache_key, bucket) [over_limit, bucket] end
leak_amount(bucket, amount, options, now)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 20 def leak_amount(bucket, amount, options, now) leak_rate = (now - bucket.fetch(:last_leak_time, 0)) / options.fetch(:interval).to_f leak_amount = (leak_rate * options.fetch(:threshold).to_f) leak_amount.to_i end
reset(cache_key, options = {})
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 65 def reset(cache_key, options = {}) cache.write(cache_key, zero_counter, raw: true) end
threshold_reached(options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 82 def threshold_reached(options) burst_rate = options.fetch(:burst_rate) threshold = options.fetch(:threshold) "#{options[:handle]} threshold of #{threshold} tries per #{options[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{options[:key].inspect}, hash #{options[:cache_key]}" end
update_bucket(current_bucket_size, max_bucket_size, amount)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 26 def update_bucket(current_bucket_size, max_bucket_size, amount) over_limit = (max_bucket_size-current_bucket_size) < amount updated_bucket_size = over_limit ? current_bucket_size : current_bucket_size + amount [over_limit, updated_bucket_size] end
validate_options!(options)
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 89 def validate_options!(options) Prop::IntervalStrategy.validate_options!(options) if !options[:burst_rate].is_a?(Integer) || options[:burst_rate] < options[:threshold] raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold") end if options[:first_throttled] raise ArgumentError.new(":first_throttled is not supported") end end
zero_counter()
click to toggle source
# File lib/prop/leaky_bucket_strategy.rb, line 101 def zero_counter { bucket: 0, last_leak_time: 0, over_limit: false } end