class Async::Limiter::Concurrent
Attributes
count[R]
limit[R]
Public Class Methods
new(limit = 1, parent: nil, queue: [])
click to toggle source
# File lib/async/limiter/concurrent.rb, line 11 def initialize(limit = 1, parent: nil, queue: []) @count = 0 @limit = limit @waiting = queue @parent = parent validate! end
Public Instance Methods
acquire(*queue_args) { || ... }
click to toggle source
# File lib/async/limiter/concurrent.rb, line 39 def acquire(*queue_args) wait(*queue_args) @count += 1 return unless block_given? begin yield ensure release end end
async(*queue_args, parent: (@parent || Task.current), **options) { |task| ... }
click to toggle source
# File lib/async/limiter/concurrent.rb, line 24 def async(*queue_args, parent: (@parent || Task.current), **options) acquire(*queue_args) parent.async(**options) do |task| yield task ensure release end end
blocking?()
click to toggle source
# File lib/async/limiter/concurrent.rb, line 20 def blocking? limit_blocking? end
limit=(new_limit)
click to toggle source
# File lib/async/limiter/concurrent.rb, line 58 def limit=(new_limit) validate_limit!(new_limit) @limit = new_limit end
release()
click to toggle source
# File lib/async/limiter/concurrent.rb, line 52 def release @count -= 1 resume_waiting end
sync(*queue_args) { |parent || current| ... }
click to toggle source
# File lib/async/limiter/concurrent.rb, line 33 def sync(*queue_args) acquire(*queue_args) do yield(@parent || Task.current) end end
Private Instance Methods
limit_blocking?()
click to toggle source
# File lib/async/limiter/concurrent.rb, line 66 def limit_blocking? @count >= @limit end
resume_waiting()
click to toggle source
# File lib/async/limiter/concurrent.rb, line 82 def resume_waiting while !blocking? && (fiber = @waiting.shift) fiber.resume if fiber.alive? end end
validate!()
click to toggle source
# File lib/async/limiter/concurrent.rb, line 88 def validate! if @limit.finite? && (@limit % 1).nonzero? raise ArgumentError, "limit must be a whole number" end validate_limit! end
validate_limit!(value = @limit)
click to toggle source
# File lib/async/limiter/concurrent.rb, line 96 def validate_limit!(value = @limit) raise ArgumentError, "limit must be greater than 1" if value < 1 end
wait(*queue_args)
click to toggle source
# File lib/async/limiter/concurrent.rb, line 70 def wait(*queue_args) fiber = Fiber.current if blocking? @waiting.push(fiber, *queue_args) # queue_args used for custom queues Task.yield while blocking? end rescue Exception # rubocop:disable Lint/RescueException @waiting.delete(fiber) raise end