class Async::Limiter::Window
Constants
- NULL_TIME
- TYPES
Attributes
count[R]
lock[R]
type[R]
Public Class Methods
new(limit = 1, type: :fixed, window: 1, parent: nil, burstable: true, lock: true, queue: [])
click to toggle source
# File lib/async/limiter/window.rb, line 18 def initialize(limit = 1, type: :fixed, window: 1, parent: nil, burstable: true, lock: true, queue: []) @count = 0 @input_limit = @limit = limit @type = type @input_window = @window = window @parent = parent @burstable = burstable @lock = lock @waiting = queue @scheduler = nil @yield_wait = false @yield_notification = Notification.new @window_frame_start_time = NULL_TIME @window_start_time = NULL_TIME @window_count = 0 update_concurrency validate! end
Public Instance Methods
acquire(*queue_args) { || ... }
click to toggle source
# File lib/async/limiter/window.rb, line 68 def acquire(*queue_args) wait(*queue_args) @count += 1 current_time = Clock.now if window_changed?(current_time) @window_start_time = if @type == :sliding current_time elsif @type == :fixed (current_time / @window).to_i * @window else raise "invalid type #{@type}" end @window_count = 1 else @window_count += 1 end @window_frame_start_time = current_time return unless block_given? begin yield ensure release end end
async(*queue_args, parent: (@parent || Task.current), **options) { |task| ... }
click to toggle source
# File lib/async/limiter/window.rb, line 53 def async(*queue_args, parent: (@parent || Task.current), **options) acquire(*queue_args) parent.async(**options) do |task| yield task ensure release end end
blocking?()
click to toggle source
# File lib/async/limiter/window.rb, line 49 def blocking? limit_blocking? || window_blocking? || window_frame_blocking? end
limit()
click to toggle source
# File lib/async/limiter/window.rb, line 41 def limit @input_limit end
limit=(new_limit)
click to toggle source
# File lib/async/limiter/window.rb, line 107 def limit=(new_limit) validate_limit!(new_limit) @input_limit = @limit = new_limit update_concurrency resume_waiting reschedule if reschedule? limit end
release()
click to toggle source
# File lib/async/limiter/window.rb, line 100 def release @count -= 1 # We're resuming waiting fibers when lock is released. resume_waiting if @lock end
sync(*queue_args) { |current| ... }
click to toggle source
# File lib/async/limiter/window.rb, line 62 def sync(*queue_args) acquire(*queue_args) do yield Task.current end end
window()
click to toggle source
# File lib/async/limiter/window.rb, line 45 def window @input_window end
window=(new_window)
click to toggle source
# File lib/async/limiter/window.rb, line 118 def window=(new_window) validate_window!(new_window) @input_window = @window = new_window update_concurrency resume_waiting reschedule if reschedule? window end
Private Instance Methods
limit_blocking?()
click to toggle source
# File lib/async/limiter/window.rb, line 131 def limit_blocking? @lock && @count >= @limit end
next_acquire_time()
click to toggle source
# File lib/async/limiter/window.rb, line 234 def next_acquire_time if @burstable @window_start_time + @window # next window start time else @window_frame_start_time + window_frame # next window frame start time end end
reschedule()
click to toggle source
# File lib/async/limiter/window.rb, line 217 def reschedule @scheduler.stop @scheduler = nil schedule end
reschedule?()
click to toggle source
# File lib/async/limiter/window.rb, line 211 def reschedule? @scheduler && @waiting.any? && !limit_blocking? end
resume_waiting()
click to toggle source
# File lib/async/limiter/window.rb, line 224 def resume_waiting while !blocking? && (fiber = @waiting.shift) fiber.resume if fiber.alive? end # Long running non-burstable tasks may end while # #window_frame_blocking?. Start a scheduler if one is not running. schedule if schedule? end
schedule(parent: @parent || Task.current)
click to toggle source
Schedule resuming waiting tasks.
# File lib/async/limiter/window.rb, line 189 def schedule(parent: @parent || Task.current) return @scheduler if @scheduler parent.async(transient: true) do |task| @scheduler = task task.annotate("scheduling tasks for #{self.class}.") while @waiting.any? && !limit_blocking? delay = [next_acquire_time - Clock.now, 0].max task.sleep(delay) if delay.positive? # Waits for the task that started the scheduler to yield. # See #wait for more details. @yield_wait && @yield_notification&.wait resume_waiting end ensure @scheduler = nil end end
schedule?()
click to toggle source
# File lib/async/limiter/window.rb, line 182 def schedule? @scheduler.nil? && @waiting.any? && !limit_blocking? end
update_concurrency()
click to toggle source
If limit is a decimal number (e.g. 0.5) it needs to be adjusted. Make @limit a whole number and adjust @window appropriately.
# File lib/async/limiter/window.rb, line 248 def update_concurrency # reset @limit and @window @limit = @input_limit @window = @input_window return if @input_limit.infinite? return if (@input_limit % 1).zero? # @input_limit is a decimal number case @input_limit when 0...1 @window = @input_window / @input_limit @limit = 1 when (1..) if @input_window >= 2 @window = @input_window * @input_limit.floor / @input_limit @limit = @input_limit.floor else @window = @input_window * @input_limit.ceil / @input_limit @limit = @input_limit.ceil end else raise "invalid limit #{@input_limit}" end end
validate!()
click to toggle source
# File lib/async/limiter/window.rb, line 274 def validate! unless TYPES.include?(@type) raise ArgumentError, "invalid type #{@type.inspect}" end validate_limit! validate_window! end
validate_limit!(value = @input_limit)
click to toggle source
# File lib/async/limiter/window.rb, line 283 def validate_limit!(value = @input_limit) unless value.positive? raise ArgumentError, "limit must be positive number" end end
validate_window!(value = @input_window)
click to toggle source
# File lib/async/limiter/window.rb, line 289 def validate_window!(value = @input_window) unless value.positive? raise ArgumentError, "window must be positive number" end end
wait(*queue_args)
click to toggle source
# File lib/async/limiter/window.rb, line 157 def wait(*queue_args) fiber = Fiber.current # @waiting.any? check prevents fibers resumed via scheduler from # slipping in operations before other waiting fibers get resumed. if blocking? || @waiting.any? @waiting.push(fiber, *queue_args) # queue_args used for custom queues @yield_wait = true schedule if schedule? # Non-blocking signal, prevents race condition where scheduler would # start resuming waiting fibers before below 'Task.yield' was reached. @yield_notification.signal @yield_wait = false # we're out of the woods loop do Task.yield # run this line at least once break unless blocking? end end rescue Exception # rubocop:disable Lint/RescueException @waiting.delete(fiber) raise end
window_blocking?()
click to toggle source
# File lib/async/limiter/window.rb, line 135 def window_blocking? return false unless @burstable return false if window_changed? @window_count >= @limit end
window_changed?(time = Clock.now)
click to toggle source
# File lib/async/limiter/window.rb, line 149 def window_changed?(time = Clock.now) @window_start_time + @window <= time end
window_frame()
click to toggle source
# File lib/async/limiter/window.rb, line 242 def window_frame @window.to_f / @limit end
window_frame_blocking?()
click to toggle source
# File lib/async/limiter/window.rb, line 142 def window_frame_blocking? return false if @burstable return false if window_frame_changed? true end
window_frame_changed?()
click to toggle source
# File lib/async/limiter/window.rb, line 153 def window_frame_changed? @window_frame_start_time + window_frame <= Clock.now end