module Delayer::Extend
Attributes
exception[R]
expire[RW]
Public Class Methods
extended(klass)
click to toggle source
# File lib/delayer/extend.rb, line 58 def self.extended(klass) klass.class_eval do @busy = false @expire = 0 @remain_hook = nil @exception = nil @remain_received = false @lock = Monitor.new @bucket = Bucket.new(nil, nil, {}, nil) @last_reserve = nil @reserves = Set.new end end
Public Instance Methods
busy?()
click to toggle source
empty?()
click to toggle source
expire?()
click to toggle source
# File lib/delayer/extend.rb, line 108 def expire? !!@end_time&.<(Time.new.to_f) end
get_prev_point(priority)
click to toggle source
# File lib/delayer/extend.rb, line 211 def get_prev_point(priority) if @bucket.priority_of[priority] @bucket.priority_of[priority] else @priorities.index(priority)&.yield_self do |index| next_index = index - 1 get_prev_point @priorities[next_index] if next_index >= 0 end end end
pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC))
click to toggle source
# File lib/delayer/extend.rb, line 72 def pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) if @last_reserve&.reserve_at&.<=(start_time) lock.synchronize do while @last_reserve&.reserve_at&.<=(start_time) @last_reserve.register @last_reserve = @reserves.min @reserves.delete(@last_reserve) end end end end
register(procedure)
click to toggle source
register new job.
Args¶ ↑
- procedure
-
job(
Delayer::Procedure
)
Return¶ ↑
self
# File lib/delayer/extend.rb, line 165 def register(procedure) priority = procedure.delayer.priority lock.synchronize do last_pointer = get_prev_point(priority) if last_pointer @bucket.priority_of[priority] = last_pointer.break procedure else procedure.next = @bucket.first @bucket.priority_of[priority] = @bucket.first = procedure end @bucket.last = @bucket.priority_of[priority] if @bucket.last if @remain_hook && !@remain_received @remain_received = true @remain_hook.call end end self end
register_remain_hook(&proc)
click to toggle source
# File lib/delayer/extend.rb, line 207 def register_remain_hook(&proc) @remain_hook = proc end
reserve(procedure)
click to toggle source
Register reserved job. It does not execute immediately. it calls register() in procedure.reserve_at.
Args¶ ↑
- procedure
Return¶ ↑
self
# File lib/delayer/extend.rb, line 191 def reserve(procedure) lock.synchronize do if @last_reserve if @last_reserve > procedure @reserves.add(@last_reserve) @last_reserve = procedure else @reserves.add(procedure) end else @last_reserve = procedure end end self end
run(current_expire = @expire)
click to toggle source
Run registered jobs.
Args¶ ↑
- current_expire
-
expire for processing (secs, 0=unexpired)
Return¶ ↑
self
# File lib/delayer/extend.rb, line 89 def run(current_expire = @expire) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_f pop_reserve(start_time) if current_expire == 0 run_once_without_pop_reserve until empty? else @end_time = end_time = start_time + @expire run_once_without_pop_reserve while !empty? && (end_time >= Process.clock_gettime(Process::CLOCK_MONOTONIC)) @end_time = nil end if @remain_hook @remain_received = !empty? @remain_hook.call if @remain_received end rescue Exception => e @exception = e raise e end
run_once()
click to toggle source
size(node = @bucket.first)
click to toggle source
stash_enter!()
click to toggle source
DelayerのStashレベルをインクリメントする。 このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。
# File lib/delayer/extend.rb, line 230 def stash_enter! @bucket = Bucket.new(nil, nil, {}, @bucket) self end
stash_exit!()
click to toggle source
DelayerのStashレベルをデクリメントする。 このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。
Raises¶ ↑
Delayer::NoLowerLevelError
-
stash_enter!が呼ばれていない時
Delayer::RemainJobsError
-
ジョブが残っているのにこのメソッドを呼んだ時
# File lib/delayer/extend.rb, line 240 def stash_exit! stashed = @bucket.stashed raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' unless stashed raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' unless empty? @bucket = stashed end
stash_level()
click to toggle source
現在のDelayer Stashレベルを返す。
# File lib/delayer/extend.rb, line 249 def stash_level @bucket.stash_size end
validate_priority(symbol)
click to toggle source
# File lib/delayer/extend.rb, line 222 def validate_priority(symbol) unless @priorities.include? symbol raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'" end end
Private Instance Methods
forward()
click to toggle source
# File lib/delayer/extend.rb, line 255 def forward lock.synchronize do prev = @bucket.first raise 'Current bucket not found' unless prev nex = @bucket.first = prev.next @bucket.last = nil unless nex @bucket.priority_of.each do |priority, pointer| @bucket.priority_of[priority] = nex if prev == pointer end prev.next = nil prev end end
lock()
click to toggle source
# File lib/delayer/extend.rb, line 269 def lock @lock end
run_once_without_pop_reserve()
click to toggle source
# File lib/delayer/extend.rb, line 120 def run_once_without_pop_reserve if @bucket.first @busy = true procedure = forward procedure = forward while @bucket.first && procedure&.canceled? if procedure && !procedure.canceled? procedure.run end end ensure @busy = false end