class Flor::Storage

Constants

CRECON_STATUSES
FP_TYPES
MESSAGE_COLUMNS
POINTER_COLUMNS
POINTS_TO_ARCHIVE
RESCON_STATUSES

Attributes

archive[RW]

might be useful for some implementations

callbacks[R]
db[R]
models[R]
mutex[R]
unit[R]

Public Class Methods

new(unit) click to toggle source
# File lib/flor/unit/storage.rb, line 26
def initialize(unit)

  @unit = unit

  @models = {}
  @archive = @unit.conf['sto_archive']
  @mutex = @unit.conf['sto_sync'] ? Mutex.new : nil

  @callbacks = {}

  connect
end

Protected Class Methods

from_blob(content) click to toggle source
# File lib/flor/unit/storage.rb, line 949
def from_blob(content)

  content ? JSON.parse(Zlib::Inflate.inflate(content)) : nil
end
to_blob(h) click to toggle source
# File lib/flor/unit/storage.rb, line 943
      def to_blob(h)

        h ? Sequel.blob(Zlib::Deflate.deflate(JSON.dump(h))) : nil
#rescue => e; pp h; raise e
      end

Public Instance Methods

any_message?() click to toggle source
# File lib/flor/unit/storage.rb, line 297
def any_message?

  synchronize do

    @db[:flor_messages].where(status: 'created').count > 0
  end

rescue => err

  @unit.logger.warn(
    "#{self.class}#any_message?()", err, '(returning false)')

  false
end
consume(messages) click to toggle source
# File lib/flor/unit/storage.rb, line 332
def consume(messages)

  if @archive
    consume_and_archive(messages)
  else
    consume_and_discard(messages)
  end

rescue => err

  Thread.current[:sto_errored_items] = messages
  raise err
end
db_version(opts={}) click to toggle source
# File lib/flor/unit/storage.rb, line 45
def db_version(opts={})

  table, column = migration_table_and_column(opts)

  (@db[table].first rescue {})[column]
end
delete_tables() click to toggle source

Delete tables in the storage database that begin with “flor_” and have more than 2 columns (the Sequel schema_info table has 1 column as of this writing)

# File lib/flor/unit/storage.rb, line 129
def delete_tables

  @db.tables.each { |t|
    @db[t].delete \
      if t.to_s.match(/^flor_/) && @db[t].columns.size > 2 }
end
fetch_next_time() click to toggle source
# File lib/flor/unit/storage.rb, line 542
def fetch_next_time

  t =
    synchronize do
      @db[:flor_timers]
        .select(:ntime)
        .order(:ntime)
        .first(status: 'active')
    end

  t ? t[:ntime].split('.').first : nil

rescue => err

  @unit.logger.warn(
    "#{self.class}#fetch_next_time()", err, '(returning nil)')

  nil
end
fetch_traps(exid) click to toggle source
# File lib/flor/unit/storage.rb, line 312
def fetch_traps(exid)

  synchronize do

    traps
      .where(status: 'active')
      .where(domain: split_domain(exid))
      .all
  end

rescue => err

  @unit.logger.warn(
    "#{self.class}#fetch_traps()", err, '(returning [])')

  []
end
load_execution(exid) click to toggle source
# File lib/flor/unit/storage.rb, line 136
def load_execution(exid)

  synchronize do

    e = @db[:flor_executions]
      .select(:id, :content)
      .first(exid: exid) # status active or terminated doesn't matter

    return {
      'exid' => exid, 'nodes' => {}, 'counters' => {},
      'start' => Flor.tstamp, 'size' => 0
    } unless e

    ex = from_blob(e[:content])

    fail("couldn't parse execution (db id #{e[:id].to_i})") unless ex

    ex['id'] = e[:id].to_i
    ex['size'] = e[:content].size

    ex
  end
end
load_messages(exe_count) click to toggle source
# File lib/flor/unit/storage.rb, line 232
def load_messages(exe_count)

  exe_count += 2
    # load two more, could prove useful if they vanish like "petits pains"

  synchronize do

    _exids_being_processed =
      @db[:flor_messages]
        .select(:exid)
        .exclude(status: CRECON_STATUSES)
    _exids =
      @db[:flor_messages]
        .select(:exid)
        .exclude(exid: _exids_being_processed)
        .exclude(status: RESCON_STATUSES)
        .limit(exe_count)
    @db[:flor_messages]
      .where(exid: _exids, status: 'created')
      .inject({}) { |h, m| (h[m[:exid]] ||= []) << m; h }
  end

rescue => err

  @unit.logger.warn(
    "#{self.class}#load_messages()", err, '(returning {})')

  {}
end
migrate(to=nil, from=nil, opts=nil) click to toggle source
# File lib/flor/unit/storage.rb, line 90
def migrate(to=nil, from=nil, opts=nil)

  opts = [ to, from, opts ].find { |e| e.is_a?(Hash) } || {}
  opts[:target] ||= to if to.is_a?(Integer)
  opts[:current] ||= from if from.is_a?(Integer)

  opts[:table], opts[:column] = migration_table_and_column(opts)
    #
    # defaults for the migration version table:
    # { table: :schema_info,
    #   column: :version }

  skip =
    opts[:sparse_migrations] ||
    @unit.conf['db_sparse_migrations'] ||
    @unit.conf['sto_sparse_migrations']
  if skip && ! opts.has_key?(:allow_missing_migration_files)
    opts[:allow_missing_migration_files] = true
  end

  dir =
    opts[:migrations] ||
    opts[:migration_dir] ||
    @unit.conf['db_migrations'] ||
    @unit.conf['db_migration_dir'] ||
    @unit.conf['sto_migrations'] ||
    @unit.conf['sto_migration_dir'] ||
    Flor.migration_dir

  synchronize do

    Sequel::Migrator.run(@db, dir, opts)
  end
end
migration_version() click to toggle source
# File lib/flor/unit/storage.rb, line 52
def migration_version

  Dir[File.join(File.dirname(__FILE__), '../migrations/*.rb')]
    .inject([]) { |a, fn|
      m = File.basename(fn).match(/^(\d{4})_/)
      a << m[1].to_i if m
      a
    }
    .max
end
on(key, actions=[], &block) click to toggle source
# File lib/flor/unit/storage.rb, line 562
def on(key, actions=[], &block)

  as =
    case actions
    when :any, 'any' then []
    when Array then actions
    when Symbol then [ actions ]
    when String then actions.split(/\s*[;,]\s*/)
    else []
    end
      .collect(&:to_sym)

  (@callbacks[key] ||= []) << [ as, block ]
end
put_execution(exe) click to toggle source
# File lib/flor/unit/storage.rb, line 160
def put_execution(exe)

  status =
    exe['nodes'].find { |_, n| n['status'].last['status'] != 'ended' } ?
    'active' :
    'terminated'

  id = exe['id']

  if id

    exe['end'] ||= Flor.tstamp \
      if status == 'terminated'
    exe['duration'] = Time.parse(exe['end']) - Time.parse(exe['start']) \
      if exe['end']
  end

  data = to_blob(exe)
  exe['size'] = data.size

  u = @unit.identifier

  transync do

    now = Flor.tstamp

    if id

      @db[:flor_executions]
        .where(id: id.to_i)
        .update(
          content: data,
          status: status,
          mtime: now,
          munit: u)

      callback(:executions, :update, id)

    else

      exe['id'] =
        @db[:flor_executions]
          .insert(
            domain: Flor.domain(exe['exid']),
            exid: exe['exid'],
            content: data,
            status: status,
            ctime: now,
            mtime: now,
            cunit: u,
            munit: u)
          .to_i

      callback(:executions, :insert, exe['id'])
    end

    remove_nodes(exe, status, now)
    update_pointers(exe, status, now)
  end

  exe
    # return the execution hash

rescue => err

  Thread.current[:sto_errored_items] = [ exe ]
  raise err
end
put_message(m) click to toggle source
# File lib/flor/unit/storage.rb, line 392
def put_message(m)

  put_messages([ m ])
end
put_messages(ms, syn=true) click to toggle source
# File lib/flor/unit/storage.rb, line 346
def put_messages(ms, syn=true)

  return if ms.empty?

  n = Flor.tstamp
  u = @unit.identifier

  id =
    synchronize(syn) do

      stored, unstored = ms.partition { |m| m['mid'] }

      #
      # de-reserve any previously stored message, might happen
      # for "terminated" messages that got queued back to let
      # other messages get processed

      @db[:flor_messages]
        .where(id: stored.collect { |m| m['mid'] })
        .update(status: 'created', mtime: n, munit: u) \
          if stored.any?

      #
      # store new messages

      @db[:flor_messages]
        .import(
          MESSAGE_COLUMNS,
          unstored.map { |m|
            [ Flor.domain(m['exid']), m['exid'], m['point'], to_blob(m),
              'created', n, n, u, u ] }) \
                if unstored.any?

      @db[:flor_messages].max(:id)
    end

  @unit.wake_up

  id

rescue => err

  Thread.current[:sto_errored_items] = ms
  raise err
end
put_timer(message) click to toggle source
# File lib/flor/unit/storage.rb, line 418
def put_timer(message)

  type, string = determine_type_and_schedule(message)

  next_time = compute_next_time(type, string)

  now = Flor.tstamp
  u = @unit.identifier

  id =
    synchronize do

      @db[:flor_timers]
        .insert(
          domain: Flor.domain(message['exid']),
          exid: message['exid'],
          nid: message['nid'],
          onid: message['onid'] || message['nid'],
          bnid: message['nid'],
          type: type,
          schedule: string,
          ntime: next_time,
          content: to_blob(message),
          count: 0,
          status: 'active',
          ctime: now,
          mtime: now,
          cunit: u,
          munit: u)
    end

  callback(:timers, :insert, id)

  @unit.wake_up

rescue => err

  Thread.current[:sto_errored_items] = [ message ]
  raise err
end
put_trap(node, tra) click to toggle source
# File lib/flor/unit/storage.rb, line 475
def put_trap(node, tra)

  exid = node['exid']
  dom = Flor.domain(exid)
  now = Flor.tstamp
  u = @unit.identifier

  id =
    synchronize do

#points = att_a('point', 'points', nil)  ### TODO
#tags = att_a('tag', 'tags', nil)        #
#heats = att_a('heat', 'heats', nil)     #
#heaps = att_a('heap', 'heaps', nil)     #
#names = att_a('name', 'names', nil)     #
    #
  #opts[:heap] = theaps.split(',') if theaps
  #opts[:heat] = theats.split(',') if theats
        #
      @db[:flor_traps]
        .insert(
          domain: dom,
          exid: exid,
          nid: tra['nid'],
          onid: tra['onid'] || tra['nid'],
          bnid: tra['bnid'],
          trange: tra['range'],
          tpoints: commaify(tra['points']),
          ttags: commaify(tra['tags']),
          theats: commaify(tra['heats']),
          theaps: commaify(tra['heaps']),
          content: to_blob(tra),
          status: 'active',
          ctime: now,
          mtime: now,
          cunit: u,
          munit: u)
    end

  callback(:traps, :insert, id)

  traps[id]

rescue => err

  Thread.current[:sto_errored_items] = [ node, tra ]
  raise err
end
ready?() click to toggle source
# File lib/flor/unit/storage.rb, line 63
def ready?

  db_version == migration_version
end
reserve_all_messages(messages) click to toggle source
# File lib/flor/unit/storage.rb, line 262
def reserve_all_messages(messages)

  now = Flor.tstamp
  count = 0

  transync do

    messages.each do |m|

      c = @db[:flor_messages]
        .where(
          id: m[:id].to_i, status: 'created',
          mtime: m[:mtime], munit: m[:munit])
        .update(
          status: 'reserved', mtime: now, munit: @unit.identifier)

      raise Sequel::Rollback if c != 1

      count += 1
    end
  end

  count == messages.size
    # true means success: all the messages could be reserved,
    # executor is clear to work on the execution

rescue => err

  @unit.logger.warn(
    "#{self.class}#reserve_all_messages()", err, '(returning false)')

  false
    # failure
end
shutdown() click to toggle source
# File lib/flor/unit/storage.rb, line 39
    def shutdown

      @db.disconnect
#p [ :disconnected, @db.object_id ]
    end
synchronize(on=true, &block) click to toggle source
# File lib/flor/unit/storage.rb, line 68
def synchronize(on=true, &block)

  Thread.current[:sto_errored_items] = nil if on

  if @mutex && on
    @mutex.synchronize(&block)
  else
    block.call
  end
end
trace(exid, nid, tracer, text) click to toggle source
# File lib/flor/unit/storage.rb, line 524
def trace(exid, nid, tracer, text)

  text = text.is_a?(String) ? text : JSON.dump(text)

  synchronize do

    @db[:flor_traces]
      .insert(
        domain: Flor.domain(exid),
        exid: exid,
        nid: nid,
        tracer: tracer,
        text: text,
        ctime: Flor.tstamp,
        cunit: @unit.identifier)
  end
end
transync(on=true, &block) click to toggle source
# File lib/flor/unit/storage.rb, line 79
def transync(on=true, &block)

  Thread.current[:sto_errored_items] = nil if on

  if @mutex && on
    @mutex.synchronize { @db.transaction(&block) }
  else
    block.call
  end
end
trigger_timers() click to toggle source
# File lib/flor/unit/storage.rb, line 459
def trigger_timers

  synchronize do

    load_timers.each do |t|

      @db.transaction do

        next unless reschedule_timer(t) == 1

        trigger_timer(t)
      end
    end
  end
end
unreserve_messages(max_sec) click to toggle source
# File lib/flor/unit/storage.rb, line 397
def unreserve_messages(max_sec)

  tstamp = Flor.tstamp(Time.now - max_sec)
  tstamp = tstamp[0..tstamp.rindex('.')]

  synchronize do

    @db[:flor_messages]
      .where(status: 'reserved')
      .where { mtime < tstamp }
      .update(status: 'created')
  end

rescue => err

  @unit.logger.warn(
    "#{self.class}#unreserve_messages(#{max_sec})", err, '(returning nil)')

  -1 # not zero, to indicate a problem
end

Protected Instance Methods

_commaify(o) click to toggle source
# File lib/flor/unit/storage.rb, line 591
def _commaify(o)

  if Flor.is_regex_tree?(o)
    o[1].to_s
  else #if o.is_a?(String)
    o.split(/\s*,\s*/).join(',')
  end
end
call_back(block, table, action, *rest) click to toggle source
# File lib/flor/unit/storage.rb, line 883
def call_back(block, table, action, *rest)

  block.call(
    *(
      block.arity < 0 ?
      [ table, action, *rest ] :
      [ table, action, *rest ][0, block.arity]))
end
callback(table, action, *rest) click to toggle source
# File lib/flor/unit/storage.rb, line 875
def callback(table, action, *rest)

  (@callbacks[table] || [])
    .each { |as, block|
      call_back(block, table, action, *rest) \
        if as.empty? || as.include?(action) }
end
commaify(o) click to toggle source
# File lib/flor/unit/storage.rb, line 600
def commaify(o)

  return nil unless o

  o = [ o ] if Flor.is_regex_tree?(o)
  o = [ o ] unless o.is_a?(Array)

  o.collect { |e| _commaify(e) }.join(',')
end
compute_next_time(type, string, from=nil) click to toggle source
# File lib/flor/unit/storage.rb, line 852
def compute_next_time(type, string, from=nil)

  f =
    case type
    when 'cron' then Fugit.parse_cron(string) || Fugit.parse_nat(string)
    when 'at' then Fugit.parse_at(string)
    when 'in' then Fugit.parse_duration(string)
    #when 'every' then Fugit.parse_duration(string)
    else Fugit.parse(string)
    end

  nt = f.is_a?(Time) ? f : f.next_time(from || Time.now) # local...

  Flor.tstamp(nt.utc)
end
connect() click to toggle source
# File lib/flor/unit/storage.rb, line 918
def connect

  @db = derive_db

  class << @db; attr_accessor :flor_unit; end
  @db.flor_unit = @unit

  if cv = @unit.conf['sto_connection_validation']

    to = cv.is_a?(Numeric) || cv.is_a?(String) ? cv.to_i : -1

    @db.extension(:connection_validator)
    @db.pool.connection_validation_timeout = to
      # NB: -1 means "check at every use"
  end

  if @unit.conf['sto_db_logger'] != false

    @db_logger = DbLogger.new(@unit)
    @db.loggers << @db_logger
  end
end
consume_and_archive(messages) click to toggle source
# File lib/flor/unit/storage.rb, line 610
def consume_and_archive(messages)

  transync do

    n = Flor.tstamp
    u = @unit.identifier

    @db[:flor_messages]
      .where(
        id: messages.collect { |m| m['mid'] }.uniq.compact)
      .update(
        status: 'consumed', mtime: n, munit: u)

    @db[:flor_messages]
      .import(
        MESSAGE_COLUMNS,
        messages
          .select { |m|
            ! m['mid'] && POINTS_TO_ARCHIVE.include?(m['point']) }
          .map { |m|
            [ Flor.domain(m['exid']), m['exid'], m['point'], to_blob(m),
              'consumed', n, n, u, u ] })
  end
end
consume_and_discard(messages) click to toggle source
# File lib/flor/unit/storage.rb, line 635
def consume_and_discard(messages)

  synchronize do

    @db[:flor_messages]
      .where(
        id: messages.collect { |m| m['mid'] }.uniq.compact)
      .delete
  end
end
derive_db() click to toggle source
# File lib/flor/unit/storage.rb, line 900
def derive_db

  db = @unit.conf['sto_db']

  return db if db

  uri = @unit.conf['sto_uri']

  fail ArgumentError.new("no 'sto_uri' conf, cannot connect to db") \
    unless uri

  begin
    Kernel.const_get(uri)
  rescue NameError
    Sequel.connect(uri)
  end
end
determine_type_and_schedule(message) click to toggle source

def pointer_columns

@pointer_columns ||=
  if @db[:flor_pointers].columns.include?(:content)
    POINTER_COLUMNS + [ :content ]
  else
    POINTER_COLUMNS
  end

end

# File lib/flor/unit/storage.rb, line 838
def determine_type_and_schedule(message)

  t, s = message['type'], message['string']
  return [ t, s ] if t

  t = Fugit.determine_type(s)
  return [ t, s ] if t

  s = "every #{s}"
  return [ 'cron', s ] if Fugit.parse_nat(s)

  nil
end
from_blob(content) click to toggle source
# File lib/flor/unit/storage.rb, line 956
def from_blob(content); self.class.from_blob(content); end
load_timers() click to toggle source
# File lib/flor/unit/storage.rb, line 646
def load_timers

  timers
    .where(status: 'active')
    .where { ntime <= Flor.tstam }
    .order(:ntime)
    .all

rescue => err

  @unit.logger.warn("#{self.class}#load_timers()", err, '(returning [])')

  []
end
migration_table_and_column(opts={}) click to toggle source
# File lib/flor/unit/storage.rb, line 579
def migration_table_and_column(opts={})

  [ (opts[:migration_table] ||
     @unit.conf['db_migration_table'] ||
     @unit.conf['sto_migration_table'] ||
     :schema_info).to_sym,
    (opts[:migration_column] ||
      @unit.conf['db_migration_column'] ||
      @unit.conf['sto_migration_column'] ||
      :version).to_sym ]
end
remove_nodes(exe, status, now) click to toggle source
# File lib/flor/unit/storage.rb, line 708
def remove_nodes(exe, status, now)

  exid = exe['exid']

  x = (status == 'terminated') ? {} : { nid: exe['nodes'].keys }
    # if 'terminated' include all nodes

  if @archive
    @db[:flor_timers].where(exid: exid).exclude(x).update(status: 'removed')
    @db[:flor_traps].where(exid: exid).exclude(x).update(status: 'removed')
  else
    @db[:flor_timers].where(exid: exid).exclude(x).delete
    @db[:flor_traps].where(exid: exid).exclude(x).delete
  end

  #@db[:flor_pointers].where(exid: exid).exclude(x).delete
    # done in update_pointers
end
reschedule_timer(t) click to toggle source
# File lib/flor/unit/storage.rb, line 666
def reschedule_timer(t)

  w = { id: t.id.to_i, status: 'active', mtime: t.mtime, munit: t.munit }
  r = nil

  if t.type != 'at' && t.type != 'in'

    r = @db[:flor_timers]
      .where(w)
      .update(
        count: t.count.to_i + 1,
        status: 'active',
        ntime: compute_next_time(t.type, t.schedule, t.ntime_t),
        mtime: Flor.tstamp,
        munit: @unit.identifier)

    callback(:timers, :update, w, t)

  elsif @archive

    r = @db[:flor_timers]
      .where(w)
      .update(
        count: t.count.to_i + 1,
        status: 'triggered',
        mtime: Flor.tstamp,
        munit: @unit.identifier)

    callback(:timers, :update, w, t)

  else

    r = @db[:flor_timers]
      .where(w)
      .delete

    callback(:timers, :delete, w, t)
  end

  r
end
split_domain(exid) click to toggle source
# File lib/flor/unit/storage.rb, line 868
def split_domain(exid)

  Flor.domain(exid)
    .split('.')
    .inject([]) { |a, elt| a << [ a.last, elt ].compact.join('.'); a }
end
to_blob(h) click to toggle source
# File lib/flor/unit/storage.rb, line 955
def to_blob(h); self.class.to_blob(h); end
trigger_timer(t) click to toggle source
# File lib/flor/unit/storage.rb, line 661
def trigger_timer(t)

  put_messages([ t.to_trigger_message ], false)
end
update_pointers(exe, status, now) click to toggle source
# File lib/flor/unit/storage.rb, line 729
    def update_pointers(exe, status, now)

# Q  Should we archive old pointers?
# A  Well, it might be better to only archive the execution and leave
#    in there enough information...

      exid = exe['exid']

      if status == 'terminated'
        @db[:flor_pointers].where(exid: exid).delete
        return
      end

      @db[:flor_pointers]
        .where(exid: exid)
        .where(Sequel.|({ type: FP_TYPES }, Sequel.~(nid: exe['nodes'].keys)))
        .delete
          #
          # Delete all pointer to vars, their value might have changed,
          # let's reinsert them.
          # Delete pointers to gone nodes.

      dom = Flor.domain(exid)
      u = @unit.identifier

      pointers = exe['nodes']
        .inject([]) { |a, (nid, node)|

          # add a pointer for each tag

          ts = node['tags']
          ts.each { |t|
            a << [ dom, exid, nid, 'tag', t, nil, now, u, nil ] } if ts

          # add a pointer for each var (if nid == '0')

          vs = nid == '0' ? node['vars'] : nil
          vs.each { |k, v|
            case v; when Numeric, String, TrueClass, FalseClass, NilClass
              a << [ dom, exid, '0', 'var', k, v.to_s, now, u, v ]
            when Array, Hash
              s = '(array)'; s = '(object)' if v.is_a?(Hash)
              a << [ dom, exid, '0', 'var', k, s, now, u, v ]
            else
              a << [ dom, exid, '0', 'var', k, nil, now, u, v ]
            end } if vs

          # add a pointer for the task if any

          if ta = node['task']
            tasker = ta['tasker']
            n = ta['name']; name = n.is_a?(String) ? n : JSON.dump(n)
            content = { message: node['message'], atts: node['atts'] }
            a << [ dom, exid, nid, 'tasker', tasker, name, now, u, content ]
          end

          # add a pointer for the error if any

          if fa = node['failure']

#puts "-" * 80; pp node; puts "-" * 80
            a <<
              if er = fa['error']
                ni = fa['from'] || nid # not nid /!\
                nam = "#{er['kla']} l#{er['lin']}"
                val = er['msg']
                con = { error: fa, nid: ni }
                [ dom, exid, ni, 'failure', nam, val, now, u, con ]
              else
                nam = fa['tasker'] || 'failure'
                val = [ fa['attl'] || [], fa['attd'] || {} ]
                  .collect(&:inspect).join(' ')
                con = { error: fa, nid: nid }
                [ dom, exid, nid, 'failure', nam, val, now, u, con ]
              end
          end

          # done

          a }

      cps = @db[:flor_pointers] # current pointers
        .where(exid: exid)
        .select(:nid, :type, :name)
        .all
      pointers.reject! { |_, _, ni, ty, na, _, _, _, _|
        cps.find { |cp| cp[:nid] == ni && cp[:type] == ty && cp[:name] == na } }
          #
          # don't insert when already inserted

      pointers.each { |ptr| c = ptr[8]; ptr[8] = to_blob(c) if c }

      @db[:flor_pointers]
        .import(
          POINTER_COLUMNS,
          pointers)

      callback(:pointers, :update, exid)
    end