class Ztimer
Constants
- VERSION
Attributes
concurrency[R]
count[R]
queue[R]
running[R]
watcher[R]
Public Class Methods
method_missing(name, *args, &block)
click to toggle source
# File lib/ztimer.rb, line 71 def self.method_missing(name, *args, &block) @default_instance ||= Ztimer.new(concurrency: 20) @default_instance.send(name, *args, &block) end
new(concurrency: 20)
click to toggle source
# File lib/ztimer.rb, line 11 def initialize(concurrency: 20) @concurrency = concurrency @watcher = Ztimer::Watcher.new(){|slot| execute(slot) } @workers_lock = Mutex.new @count_lock = Mutex.new @queue = Queue.new @running = 0 @count = 0 end
Public Instance Methods
after(milliseconds, &callback)
click to toggle source
# File lib/ztimer.rb, line 31 def after(milliseconds, &callback) enqueued_at = utc_microseconds expires_at = enqueued_at + milliseconds * 1000 slot = Slot.new(enqueued_at, expires_at, -1, &callback) add(slot) return slot end
async(&callback)
click to toggle source
# File lib/ztimer.rb, line 21 def async(&callback) enqueued_at = utc_microseconds slot = Slot.new(enqueued_at, enqueued_at, -1, &callback) incr_counter! execute(slot) return slot end
concurrency=(new_value)
click to toggle source
# File lib/ztimer.rb, line 55 def concurrency=(new_value) raise ArgumentError.new("Invalid concurrency value: #{new_value}") unless new_value.is_a?(Fixnum) && new_value >= 1 @concurrency = new_value end
every(milliseconds, &callback)
click to toggle source
# File lib/ztimer.rb, line 41 def every(milliseconds, &callback) enqueued_at = utc_microseconds expires_at = enqueued_at + milliseconds * 1000 slot = Slot.new(enqueued_at, expires_at, milliseconds * 1000, &callback) add(slot) return slot end
jobs_count()
click to toggle source
# File lib/ztimer.rb, line 51 def jobs_count return @watcher.jobs end
stats()
click to toggle source
# File lib/ztimer.rb, line 61 def stats { running: @running, scheduled: @watcher.jobs, executing: @queue.size, total: @count } end
Protected Instance Methods
add(slot)
click to toggle source
# File lib/ztimer.rb, line 78 def add(slot) incr_counter! @watcher << slot end
execute(slot)
click to toggle source
# File lib/ztimer.rb, line 87 def execute(slot) @queue << slot @workers_lock.synchronize do [@concurrency - @running, @queue.size].min.times do @running += 1 start_new_thread! end end end
incr_counter!()
click to toggle source
# File lib/ztimer.rb, line 83 def incr_counter! @count_lock.synchronize{ @count += 1 } end
start_new_thread!()
click to toggle source
# File lib/ztimer.rb, line 98 def start_new_thread! worker = Thread.new do begin loop do current_slot = nil @workers_lock.synchronize do current_slot = @queue.pop(true) unless @queue.empty? end break if current_slot.nil? begin current_slot.executed_at = utc_microseconds current_slot.callback.call(current_slot) unless current_slot.callback.nil? || current_slot.canceled? rescue => e STDERR.puts e.inspect + (e.backtrace ? "\n" + e.backtrace.join("\n") : "") end end rescue ThreadError puts "queue is empty" end @workers_lock.synchronize { @running -= 1 } end worker.abort_on_exception = true end
utc_microseconds()
click to toggle source
# File lib/ztimer.rb, line 123 def utc_microseconds return Time.now.to_f * 1_000_000 end