class Flor::WaitList

Constants

DEFAULT_TIMEOUT

Regular waiters are message waiters, they wait for a message that matches a pattern

Row waiters are waiting for the pattern to realize in the database a better name would probably have been “query waiter”. Row waiters need their own thread for checking at interval. Row waiters can live in a different Ruby process from the Ruby process performing the executions.

Public Class Methods

new(unit) click to toggle source
# File lib/flor/unit/wlist.rb, line 26
def initialize(unit)

  @unit = unit
  @unit.hooker.add('wlist', self)

  @mutex = Mutex.new
  @msg_waiters = []
  @row_waiters = []

  @unit.instance_eval do
    def wait(exid, opts=true, more=nil)
      @hooker['wlist'].wait(exid, opts, more)
    end
  end

  @row_thread = nil
  @row_thread_status = nil
  @row_frequency = @unit.conf['wtl_row_frequency'] || 1
end

Public Instance Methods

notify(executor, message) click to toggle source
# File lib/flor/unit/wlist.rb, line 53
def notify(executor, message)

  return [] unless message['consumed']
  return [] if @msg_waiters.empty?

  @mutex.synchronize do

    to_remove =
      @msg_waiters.each_with_object([]) do |w, a|
        remove = w.notify(executor, message)
        a << w if remove
      end

    @msg_waiters -= to_remove
  end

  [] # no new messages
end
shutdown() click to toggle source
# File lib/flor/unit/wlist.rb, line 46
def shutdown

  @row_thread_status = :shutdown

  nil
end
wait(exid, opts=true, more=nil) click to toggle source
# File lib/flor/unit/wlist.rb, line 36
def wait(exid, opts=true, more=nil)
  @hooker['wlist'].wait(exid, opts, more)
end

Protected Instance Methods

check() click to toggle source
# File lib/flor/unit/wlist.rb, line 135
def check

  @mutex.synchronize do

    to_remove = []

    rs = row_query_all

    @row_waiters.each do |w|
      remove = w.check(@unit, rs)
      to_remove << w if remove
    end

    @row_waiters -= to_remove
  end

rescue => err

  @unit.logger.warn("#{self.class}#check()", err)
end
row_query(klass, hs) click to toggle source
# File lib/flor/unit/wlist.rb, line 168
def row_query(klass, hs)

  return [] if hs.empty?

  q = @unit.send(klass).where(hs.shift)
  hs.each { |h| q = q.or(h) }

  q.all
end
row_query_all() click to toggle source
# File lib/flor/unit/wlist.rb, line 156
def row_query_all

  exes, ptrs =
    @row_waiters.inject([ [], [] ]) { |a, w|
      es, ps = w.to_query_hashes
      a[0].concat(es)
      a[1].concat(ps)
      a }

  [ row_query(:executions, exes), row_query(:pointers, ptrs) ]
end
start_row_thread() click to toggle source
# File lib/flor/unit/wlist.rb, line 117
def start_row_thread

  return if @row_thread_status == :shutdown

  @row_thread = nil if @row_thread && ! @row_thread.alive?
  @row_thread_status = :running

  @row_thread ||=
    Thread.new do
      loop do
        sleep(@row_frequency)
        break if [ :stop, :shutdown ].include?(@row_thread_status)
        break if @row_waiters.empty?
        check
      end
    end
end