module Delayer::Extend

Attributes

exception[R]
expire[RW]

Public Class Methods

extended(klass) click to toggle source
# File lib/delayer/extend.rb, line 58
def self.extended(klass)
  klass.class_eval do
    @busy = false
    @expire = 0
    @remain_hook = nil
    @exception = nil
    @remain_received = false
    @lock = Monitor.new
    @bucket = Bucket.new(nil, nil, {}, nil)
    @last_reserve = nil
    @reserves = Set.new
  end
end

Public Instance Methods

busy?() click to toggle source

Return if some jobs processing now.

Args

args

Return

true if Delayer processing job

# File lib/delayer/extend.rb, line 138
def busy?
  @busy
end
empty?() click to toggle source

Return true if no jobs has.

Return

true if no jobs has.

# File lib/delayer/extend.rb, line 145
def empty?
  !@bucket.first
end
expire?() click to toggle source
# File lib/delayer/extend.rb, line 108
def expire?
  !!@end_time&.<(Time.new.to_f)
end
get_prev_point(priority) click to toggle source
# File lib/delayer/extend.rb, line 211
def get_prev_point(priority)
  if @bucket.priority_of[priority]
    @bucket.priority_of[priority]
  else
    @priorities.index(priority)&.yield_self do |index|
      next_index = index - 1
      get_prev_point @priorities[next_index] if next_index >= 0
    end
  end
end
pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) click to toggle source
# File lib/delayer/extend.rb, line 72
def pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC))
  if @last_reserve&.reserve_at&.<=(start_time)
    lock.synchronize do
      while @last_reserve&.reserve_at&.<=(start_time)
        @last_reserve.register
        @last_reserve = @reserves.min
        @reserves.delete(@last_reserve)
      end
    end
  end
end
register(procedure) click to toggle source

register new job.

Args

procedure

job(Delayer::Procedure)

Return

self

# File lib/delayer/extend.rb, line 165
def register(procedure)
  priority = procedure.delayer.priority
  lock.synchronize do
    last_pointer = get_prev_point(priority)
    if last_pointer
      @bucket.priority_of[priority] = last_pointer.break procedure
    else
      procedure.next = @bucket.first
      @bucket.priority_of[priority] = @bucket.first = procedure
    end
    @bucket.last = @bucket.priority_of[priority] if @bucket.last
    if @remain_hook && !@remain_received
      @remain_received = true
      @remain_hook.call
    end
  end
  self
end
register_remain_hook(&proc) click to toggle source
# File lib/delayer/extend.rb, line 207
def register_remain_hook(&proc)
  @remain_hook = proc
end
reserve(procedure) click to toggle source

Register reserved job. It does not execute immediately. it calls register() in procedure.reserve_at.

Args

procedure

job(Delayer::DelayedProcedure)

Return

self

# File lib/delayer/extend.rb, line 191
def reserve(procedure)
  lock.synchronize do
    if @last_reserve
      if @last_reserve > procedure
        @reserves.add(@last_reserve)
        @last_reserve = procedure
      else
        @reserves.add(procedure)
      end
    else
      @last_reserve = procedure
    end
  end
  self
end
run(current_expire = @expire) click to toggle source

Run registered jobs.

Args

current_expire

expire for processing (secs, 0=unexpired)

Return

self

# File lib/delayer/extend.rb, line 89
def run(current_expire = @expire)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_f
  pop_reserve(start_time)
  if current_expire == 0
    run_once_without_pop_reserve until empty?
  else
    @end_time = end_time = start_time + @expire
    run_once_without_pop_reserve while !empty? && (end_time >= Process.clock_gettime(Process::CLOCK_MONOTONIC))
    @end_time = nil
  end
  if @remain_hook
    @remain_received = !empty?
    @remain_hook.call if @remain_received
  end
rescue Exception => e
  @exception = e
  raise e
end
run_once() click to toggle source

Run a job and forward pointer.

Return

self

# File lib/delayer/extend.rb, line 115
def run_once
  pop_reserve
  run_once_without_pop_reserve
end
size(node = @bucket.first) click to toggle source

Return remain jobs quantity.

Return

Count of remain jobs

# File lib/delayer/extend.rb, line 152
def size(node = @bucket.first)
  if node
    1 + size(node.next)
  else
    0
  end
end
stash_enter!() click to toggle source

DelayerのStashレベルをインクリメントする。 このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。

# File lib/delayer/extend.rb, line 230
def stash_enter!
  @bucket = Bucket.new(nil, nil, {}, @bucket)
  self
end
stash_exit!() click to toggle source

DelayerのStashレベルをデクリメントする。 このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。

Raises

Delayer::NoLowerLevelError

stash_enter!が呼ばれていない時

Delayer::RemainJobsError

ジョブが残っているのにこのメソッドを呼んだ時

# File lib/delayer/extend.rb, line 240
def stash_exit!
  stashed = @bucket.stashed
  raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' unless stashed
  raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' unless empty?

  @bucket = stashed
end
stash_level() click to toggle source

現在のDelayer Stashレベルを返す。

# File lib/delayer/extend.rb, line 249
def stash_level
  @bucket.stash_size
end
validate_priority(symbol) click to toggle source
# File lib/delayer/extend.rb, line 222
def validate_priority(symbol)
  unless @priorities.include? symbol
    raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'"
  end
end

Private Instance Methods

forward() click to toggle source
# File lib/delayer/extend.rb, line 255
def forward
  lock.synchronize do
    prev = @bucket.first
    raise 'Current bucket not found' unless prev
    nex = @bucket.first = prev.next
    @bucket.last = nil unless nex
    @bucket.priority_of.each do |priority, pointer|
      @bucket.priority_of[priority] = nex if prev == pointer
    end
    prev.next = nil
    prev
  end
end
lock() click to toggle source
# File lib/delayer/extend.rb, line 269
def lock
  @lock
end
run_once_without_pop_reserve() click to toggle source
# File lib/delayer/extend.rb, line 120
        def run_once_without_pop_reserve
  if @bucket.first
    @busy = true
    procedure = forward
    procedure = forward while @bucket.first && procedure&.canceled?
    if procedure && !procedure.canceled?
      procedure.run
    end
  end
ensure
  @busy = false
end