class Async::Limiter::Concurrent

Attributes

count[R]
limit[R]

Public Class Methods

new(limit = 1, parent: nil, queue: []) click to toggle source
# File lib/async/limiter/concurrent.rb, line 11
def initialize(limit = 1, parent: nil, queue: [])
  @count = 0
  @limit = limit
  @waiting = queue
  @parent = parent

  validate!
end

Public Instance Methods

acquire(*queue_args) { || ... } click to toggle source
# File lib/async/limiter/concurrent.rb, line 39
def acquire(*queue_args)
  wait(*queue_args)
  @count += 1

  return unless block_given?

  begin
    yield
  ensure
    release
  end
end
async(*queue_args, parent: (@parent || Task.current), **options) { |task| ... } click to toggle source
# File lib/async/limiter/concurrent.rb, line 24
def async(*queue_args, parent: (@parent || Task.current), **options)
  acquire(*queue_args)
  parent.async(**options) do |task|
    yield task
  ensure
    release
  end
end
blocking?() click to toggle source
# File lib/async/limiter/concurrent.rb, line 20
def blocking?
  limit_blocking?
end
limit=(new_limit) click to toggle source
# File lib/async/limiter/concurrent.rb, line 58
def limit=(new_limit)
  validate_limit!(new_limit)

  @limit = new_limit
end
release() click to toggle source
# File lib/async/limiter/concurrent.rb, line 52
def release
  @count -= 1

  resume_waiting
end
sync(*queue_args) { |parent || current| ... } click to toggle source
# File lib/async/limiter/concurrent.rb, line 33
def sync(*queue_args)
  acquire(*queue_args) do
    yield(@parent || Task.current)
  end
end

Private Instance Methods

limit_blocking?() click to toggle source
# File lib/async/limiter/concurrent.rb, line 66
def limit_blocking?
  @count >= @limit
end
resume_waiting() click to toggle source
# File lib/async/limiter/concurrent.rb, line 82
def resume_waiting
  while !blocking? && (fiber = @waiting.shift)
    fiber.resume if fiber.alive?
  end
end
validate!() click to toggle source
# File lib/async/limiter/concurrent.rb, line 88
def validate!
  if @limit.finite? && (@limit % 1).nonzero?
    raise ArgumentError, "limit must be a whole number"
  end

  validate_limit!
end
validate_limit!(value = @limit) click to toggle source
# File lib/async/limiter/concurrent.rb, line 96
def validate_limit!(value = @limit)
  raise ArgumentError, "limit must be greater than 1" if value < 1
end
wait(*queue_args) click to toggle source
# File lib/async/limiter/concurrent.rb, line 70
def wait(*queue_args)
  fiber = Fiber.current

  if blocking?
    @waiting.push(fiber, *queue_args) # queue_args used for custom queues
    Task.yield while blocking?
  end
rescue Exception # rubocop:disable Lint/RescueException
  @waiting.delete(fiber)
  raise
end