class Flor::Waiter

Constants

PT_REX
ROW_PSEUDO_POINTS

Public Class Methods

new(exid, opts) click to toggle source
# File lib/flor/unit/waiter.rb, line 7
def initialize(exid, opts)

  serie, timeout, on_timeout =
    expand_args(opts)

  # TODO fail if the serie mixes msg_waiting with row_waiting...

  @exid = exid
  @serie = serie
  @timeout = timeout
  @on_timeout = on_timeout

  @queue = []
  @mutex = Mutex.new
  @var = ConditionVariable.new

  @executor = nil
end

Public Instance Methods

check(unit, rs) click to toggle source
# File lib/flor/unit/waiter.rb, line 68
    def check(unit, rs)

      @mutex.synchronize do

        row = nil

        loop do

          break if @serie.empty?

          row = row_match?(unit, rs)
          return false unless row

          @serie.shift
        end

        @queue << [ unit, row ]
        @var.signal
      end

      true # serie over, remove me

    rescue => err

#puts "!" * 80; p err
      unit.logger.warn(
        "#{self.class}#check()", err, '(returning true, aka remove me)')

      true # remove me
    end
msg_waiter?() click to toggle source
# File lib/flor/unit/waiter.rb, line 37
def msg_waiter?

  @serie.find { |_, points|
    points.find { |po|
      ! ROW_PSEUDO_POINTS.include?(po.split(':').first) } }
end
notify(executor, message) click to toggle source
# File lib/flor/unit/waiter.rb, line 49
def notify(executor, message)

  @executor = executor
    # could be handy

  @mutex.synchronize do

    return false unless msg_match?(message)

    @serie.shift
    return false if @serie.any?

    @queue << [ executor, message ]
    @var.signal
  end

  true # serie over, remove me
end
row_waiter?() click to toggle source

“tasker”, not “task”, since “task” is already a message point

# File lib/flor/unit/waiter.rb, line 29
def row_waiter?

  @serie.find { |_, points|
    points.find { |po|
      pos = po.split(':')
      pos.length > 1 && ROW_PSEUDO_POINTS.include?(pos[0]) } }
end
to_query_hashes() click to toggle source
# File lib/flor/unit/waiter.rb, line 121
def to_query_hashes

  @serie
    .inject([ [], [] ]) { |a, (nid, points)|

      points.each do |point|

        ss = point.split(':')

        h = {}
        h[:exid] = @exid if @exid
        h[:nid] = nid if nid

        case ss[0]
        when 'status'
          h[:status] = ss[1]
          a[0] << h
        when 'tag', 'tasker', 'var', 'variable'
          t = ss[0]; t = 'var' if t == 'variable'
          h[:type] = t
          h[:name] = ss[1]
          h[:value] = ss[2] if ss[2]
          a[1] << h
        else
          fail ArgumentError, "cannot turn to query_hash, #{self.inspect}"
        end
      end

      a }
end
to_s() click to toggle source
# File lib/flor/unit/waiter.rb, line 44
def to_s

  "#{super[0..-2]}#{{ exid: @exid, timeout: @timeout }.inspect}>"
end
wait() click to toggle source
# File lib/flor/unit/waiter.rb, line 99
def wait

  @mutex.synchronize do

    if @queue.empty?

      @var.wait(@mutex, @timeout)
        # will wait "in aeternum" if @timeout is nil

      if @queue.empty?
        fail RuntimeError.new(
          "timeout for #{self.to_s}, " +
          "msg_waiter? #{ !! msg_waiter?}, row_waiter? #{ !! row_waiter?}"
        ) if @on_timeout == 'fail'
        return { 'exid' => @exid, 'timed_out' => @on_timeout }
      end
    end

    @queue.shift[1]
  end
end

Protected Instance Methods

expand_args(opts) click to toggle source
# File lib/flor/unit/waiter.rb, line 225
def expand_args(opts)

  owait = opts[:wait]
  otimeout = opts[:timeout]
  oontimeout = opts[:on_timeout] || opts[:ontimeout] || 'fail'

  case owait
  when nil, true
    [ [ [ nil, %w[ failed terminated ] ] ], # serie
      otimeout,
      oontimeout ]
  when Numeric
    [ [ [ nil, %w[ failed terminated ] ] ], # serie
      owait, # timeout
      oontimeout ]
  when String, Array
    [ parse_serie(owait), # serie
      otimeout,
      oontimeout ]
  else
    fail ArgumentError.new(
      "don't know how to deal with #{owait.inspect} (#{owait.class})")
  end
end
msg_match?(message) click to toggle source
# File lib/flor/unit/waiter.rb, line 154
def msg_match?(message)

  mpoint = message['point']

  return false if @exid && @exid != message['exid'] && mpoint != 'idle'

  nid, points = @serie.first
  mnid = message['nid']

  return false if nid && mnid && nid != mnid

  return false unless points.find { |point|
    ps = point.split(':')
    next false if ps[0] != mpoint
    next false if ps[1] && ! message['tags'].include?(ps[1])
    true }

  true
end
parse_serie(s) click to toggle source
# File lib/flor/unit/waiter.rb, line 252
def parse_serie(s)

  return s if s.is_a?(Array) && s.collect(&:class).uniq == [ Array ]

  (s.is_a?(String) ? s.split(';') : s)
    .collect { |ss|

      k = StringScanner.new(ss.strip)

      ni = k.scan(Flor::START_NID_REX)

      k.scan(/\s*/)

      pts = []; loop do
          pt = k.scan(PT_REX); break unless pt
          pts << pt
          k.scan(/\s*[|,]\s*/)
        end

      fail ArgumentError.new(
        "cannot parse #{ss.strip.inspect} wait directive") unless k.eos?

      [ ni, pts ] }
end
row_match?(unit, rs) click to toggle source
# File lib/flor/unit/waiter.rb, line 174
def row_match?(unit, rs)

  nid, points = @serie.first

  row = nil

  points.find { |point|
    ps = point.split(':')
    row = send("row_match_#{ps[0]}?", unit, rs, nid, ps[1..-1]) }

  row
end
row_match_status?(unit, rs, _, cdr) click to toggle source
# File lib/flor/unit/waiter.rb, line 187
def row_match_status?(unit, rs, _, cdr)

  rs[0].find { |exe|
    (@exid == nil || exe.exid == @exid) &&
    exe.status == cdr.first }
end
row_match_tag?(unit, rs, nid, (name, value)) click to toggle source
# File lib/flor/unit/waiter.rb, line 194
def row_match_tag?(unit, rs, nid, (name, value))

  rs[1].find { |ptr|
    ptr.type == 'tag' &&
    (@exid == nil || ptr.exid == @exid) &&
    (nid == nil || ptr.nid == nid) &&
    (name == nil || ptr.name == name) &&
    (value == nil || ptr.value == value) }
end
row_match_tasker?(unit, rs, nid, (name, value)) click to toggle source
# File lib/flor/unit/waiter.rb, line 215
def row_match_tasker?(unit, rs, nid, (name, value))

  rs[1].find { |ptr|
    ptr.type == 'tasker' &&
    (@exid == nil || ptr.exid == @exid) &&
    (nid == nil || ptr.nid == nid) &&
    (name == nil || ptr.name == name) &&
    (value == nil || ptr.value == value) }
end
row_match_var?(unit, rs, nid, (name, value)) click to toggle source
# File lib/flor/unit/waiter.rb, line 204
def row_match_var?(unit, rs, nid, (name, value))

  rs[1].find { |ptr|
    ptr.type == 'var' &&
    (@exid == nil || ptr.exid == @exid) &&
    (nid == nil || ptr.nid == nid) &&
    (name == nil || ptr.name == name) &&
    (value == nil || ptr.value == value.to_s) }
end
Also aliased as: row_match_variable?
row_match_variable?(unit, rs, nid, (name, value))
Alias for: row_match_var?