class Async::Limiter::Window

Constants

NULL_TIME
TYPES

Attributes

count[R]
lock[R]
type[R]

Public Class Methods

new(limit = 1, type: :fixed, window: 1, parent: nil, burstable: true, lock: true, queue: []) click to toggle source
# File lib/async/limiter/window.rb, line 18
def initialize(limit = 1, type: :fixed, window: 1, parent: nil,
  burstable: true, lock: true, queue: [])
  @count = 0
  @input_limit = @limit = limit
  @type = type
  @input_window = @window = window
  @parent = parent
  @burstable = burstable
  @lock = lock

  @waiting = queue
  @scheduler = nil
  @yield_wait = false
  @yield_notification = Notification.new

  @window_frame_start_time = NULL_TIME
  @window_start_time = NULL_TIME
  @window_count = 0

  update_concurrency
  validate!
end

Public Instance Methods

acquire(*queue_args) { || ... } click to toggle source
# File lib/async/limiter/window.rb, line 68
def acquire(*queue_args)
  wait(*queue_args)
  @count += 1

  current_time = Clock.now

  if window_changed?(current_time)
    @window_start_time =
      if @type == :sliding
        current_time
      elsif @type == :fixed
        (current_time / @window).to_i * @window
      else
        raise "invalid type #{@type}"
      end

    @window_count = 1
  else
    @window_count += 1
  end

  @window_frame_start_time = current_time

  return unless block_given?

  begin
    yield
  ensure
    release
  end
end
async(*queue_args, parent: (@parent || Task.current), **options) { |task| ... } click to toggle source
# File lib/async/limiter/window.rb, line 53
def async(*queue_args, parent: (@parent || Task.current), **options)
  acquire(*queue_args)
  parent.async(**options) do |task|
    yield task
  ensure
    release
  end
end
blocking?() click to toggle source
# File lib/async/limiter/window.rb, line 49
def blocking?
  limit_blocking? || window_blocking? || window_frame_blocking?
end
limit() click to toggle source
# File lib/async/limiter/window.rb, line 41
def limit
  @input_limit
end
limit=(new_limit) click to toggle source
# File lib/async/limiter/window.rb, line 107
def limit=(new_limit)
  validate_limit!(new_limit)
  @input_limit = @limit = new_limit

  update_concurrency
  resume_waiting
  reschedule if reschedule?

  limit
end
release() click to toggle source
# File lib/async/limiter/window.rb, line 100
def release
  @count -= 1

  # We're resuming waiting fibers when lock is released.
  resume_waiting if @lock
end
sync(*queue_args) { |current| ... } click to toggle source
# File lib/async/limiter/window.rb, line 62
def sync(*queue_args)
  acquire(*queue_args) do
    yield Task.current
  end
end
window() click to toggle source
# File lib/async/limiter/window.rb, line 45
def window
  @input_window
end
window=(new_window) click to toggle source
# File lib/async/limiter/window.rb, line 118
def window=(new_window)
  validate_window!(new_window)
  @input_window = @window = new_window

  update_concurrency
  resume_waiting
  reschedule if reschedule?

  window
end

Private Instance Methods

limit_blocking?() click to toggle source
# File lib/async/limiter/window.rb, line 131
def limit_blocking?
  @lock && @count >= @limit
end
next_acquire_time() click to toggle source
# File lib/async/limiter/window.rb, line 234
def next_acquire_time
  if @burstable
    @window_start_time + @window # next window start time
  else
    @window_frame_start_time + window_frame # next window frame start time
  end
end
reschedule() click to toggle source
# File lib/async/limiter/window.rb, line 217
def reschedule
  @scheduler.stop
  @scheduler = nil

  schedule
end
reschedule?() click to toggle source
# File lib/async/limiter/window.rb, line 211
def reschedule?
  @scheduler &&
    @waiting.any? &&
    !limit_blocking?
end
resume_waiting() click to toggle source
# File lib/async/limiter/window.rb, line 224
def resume_waiting
  while !blocking? && (fiber = @waiting.shift)
    fiber.resume if fiber.alive?
  end

  # Long running non-burstable tasks may end while
  # #window_frame_blocking?. Start a scheduler if one is not running.
  schedule if schedule?
end
schedule(parent: @parent || Task.current) click to toggle source

Schedule resuming waiting tasks.

# File lib/async/limiter/window.rb, line 189
def schedule(parent: @parent || Task.current)
  return @scheduler if @scheduler

  parent.async(transient: true) do |task|
    @scheduler = task
    task.annotate("scheduling tasks for #{self.class}.")

    while @waiting.any? && !limit_blocking?
      delay = [next_acquire_time - Clock.now, 0].max
      task.sleep(delay) if delay.positive?

      # Waits for the task that started the scheduler to yield.
      # See #wait for more details.
      @yield_wait && @yield_notification&.wait

      resume_waiting
    end
  ensure
    @scheduler = nil
  end
end
schedule?() click to toggle source
# File lib/async/limiter/window.rb, line 182
def schedule?
  @scheduler.nil? &&
    @waiting.any? &&
    !limit_blocking?
end
update_concurrency() click to toggle source

If limit is a decimal number (e.g. 0.5) it needs to be adjusted. Make @limit a whole number and adjust @window appropriately.

# File lib/async/limiter/window.rb, line 248
def update_concurrency
  # reset @limit and @window
  @limit = @input_limit
  @window = @input_window

  return if @input_limit.infinite?
  return if (@input_limit % 1).zero?

  # @input_limit is a decimal number
  case @input_limit
  when 0...1
    @window = @input_window / @input_limit
    @limit = 1
  when (1..)
    if @input_window >= 2
      @window = @input_window * @input_limit.floor / @input_limit
      @limit = @input_limit.floor
    else
      @window = @input_window * @input_limit.ceil / @input_limit
      @limit = @input_limit.ceil
    end
  else
    raise "invalid limit #{@input_limit}"
  end
end
validate!() click to toggle source
# File lib/async/limiter/window.rb, line 274
def validate!
  unless TYPES.include?(@type)
    raise ArgumentError, "invalid type #{@type.inspect}"
  end

  validate_limit!
  validate_window!
end
validate_limit!(value = @input_limit) click to toggle source
# File lib/async/limiter/window.rb, line 283
def validate_limit!(value = @input_limit)
  unless value.positive?
    raise ArgumentError, "limit must be positive number"
  end
end
validate_window!(value = @input_window) click to toggle source
# File lib/async/limiter/window.rb, line 289
def validate_window!(value = @input_window)
  unless value.positive?
    raise ArgumentError, "window must be positive number"
  end
end
wait(*queue_args) click to toggle source
# File lib/async/limiter/window.rb, line 157
def wait(*queue_args)
  fiber = Fiber.current

  # @waiting.any? check prevents fibers resumed via scheduler from
  # slipping in operations before other waiting fibers get resumed.
  if blocking? || @waiting.any?
    @waiting.push(fiber, *queue_args) # queue_args used for custom queues
    @yield_wait = true
    schedule if schedule?

    # Non-blocking signal, prevents race condition where scheduler would
    # start resuming waiting fibers before below 'Task.yield' was reached.
    @yield_notification.signal
    @yield_wait = false # we're out of the woods

    loop do
      Task.yield # run this line at least once
      break unless blocking?
    end
  end
rescue Exception # rubocop:disable Lint/RescueException
  @waiting.delete(fiber)
  raise
end
window_blocking?() click to toggle source
# File lib/async/limiter/window.rb, line 135
def window_blocking?
  return false unless @burstable
  return false if window_changed?

  @window_count >= @limit
end
window_changed?(time = Clock.now) click to toggle source
# File lib/async/limiter/window.rb, line 149
def window_changed?(time = Clock.now)
  @window_start_time + @window <= time
end
window_frame() click to toggle source
# File lib/async/limiter/window.rb, line 242
def window_frame
  @window.to_f / @limit
end
window_frame_blocking?() click to toggle source
# File lib/async/limiter/window.rb, line 142
def window_frame_blocking?
  return false if @burstable
  return false if window_frame_changed?

  true
end
window_frame_changed?() click to toggle source
# File lib/async/limiter/window.rb, line 153
def window_frame_changed?
  @window_frame_start_time + window_frame <= Clock.now
end