class Ruote::ActiveRecord::Storage

Public Class Methods

new(options = {}) click to toggle source
# File lib/ruote/ar/storage.rb, line 13
def initialize(options = {})

  @table_name = options['table_name'] || ('documents').to_sym
  @ip = Ruote.local_ip
  @last_time = Time.at(0.0).utc

  @worker = [current_worker_name, @ip.gsub(/\./, '_')].join('/')

  replace_engine_configuration(options) unless options.empty?
end

Public Instance Methods

add_type(type) click to toggle source

Mainly used by ruote’s test/unit/ut_17_storage.rb

# File lib/ruote/ar/storage.rb, line 192
def add_type(type)
  # does nothing, types are differentiated by the 'typ' column
end
by_field(type, field, value, opts={}) click to toggle source

Querying workitems by field (warning, goes deep into the JSON structure)

# File lib/ruote/ar/storage.rb, line 220
def by_field(type, field, value, opts={})

  raise NotImplementedError if type != 'workitems'

  lk = [ '%"', field, '":' ]
  lk.push(Rufus::Json.encode(value)) if value
  lk.push('%')

  docs = table.where(table[:typ].eq(type).and(table[:doc].matches(lk.join)))

  return connection.select_value(docs.project('count(*)')) if opts[:count]
  
  docs = connection.select_all(docs.project('*').order(table[:ide].asc, table[:rev].desc).take(opts[:limit]).skip(opts[:offset] || opts[:skip]))
  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d['doc']) }
end
by_participant(type, participant_name, opts={}) click to toggle source

A provision made for workitems, allow to query them directly by participant name.

# File lib/ruote/ar/storage.rb, line 205
def by_participant(type, participant_name, opts={})

  raise NotImplementedError if type != 'workitems'

  docs = table.where(table[:typ].eq(type).and(table[:participant_name].eq(participant_name)))

  return connection.select_value(docs.project('count(*)')) if opts[:count]
  
  docs = connection.select_all(docs.project('*').order(table[:ide].asc, table[:rev].desc).take(opts[:limit]).skip(opts[:offset] || opts[:skip]))

  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d['doc']) }
end
close() click to toggle source

Grrr… I should sort the mess between close and shutdown… Tests vs production :-(

# File lib/ruote/ar/storage.rb, line 186
def close
  shutdown
end
delete(doc) click to toggle source
# File lib/ruote/ar/storage.rb, line 108
def delete(doc)
  raise true if doc.nil?
 
  raise ArgumentError.new('no _rev for doc') unless doc['_rev']

  # usually not necessary, adding it not to forget it later on

  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].eq(doc['_rev'].to_i)))
  count = connection.delete(dm)

  return (get(doc['type'], doc['_id']) || true) if count < 1
  # failure

  nil
  # success
end
done(doc) click to toggle source

removing doc after success (or fail) success. It’s important to not leave any message.

# File lib/ruote/ar/storage.rb, line 52
def done(doc)
  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].eq(1).and(table[:worker].eq(@worker))))
  connection.delete(dm)
end
get(type, key) click to toggle source
# File lib/ruote/ar/storage.rb, line 104
def get(type, key)
  do_get(type, key)
end
get_many(type, key=nil, opts={}) click to toggle source
# File lib/ruote/ar/storage.rb, line 127
def get_many(type, key=nil, opts={})

  ds = table[:typ].eq(type)

  keys = key ? Array(key) : nil
  ds = ds.and(table[:wfid].in(keys)) if keys && keys.first.is_a?(String)
  ds = ds.and(table[:worker].eq(nil)) if type == 'msgs'
  
  ds = table.where(ds)

  return connection.select_value(ds.project(table[:wfid].count)) if opts[:count]

  if opts[:descending].is_a?(Array) && opts[:descending].first.class != String
    opts[:descending] = opts[:descending].collect {|s| s.inspect.gsub(':','').gsub('.', ' ')}
  end

  if opts[:descending]
    ds = ds.order(table[:ide].desc, table[:rev].desc)
  else 
    ds = ds.order(table[:ide].asc, table[:rev].asc)
  end

  ds = ds.take(opts[:limit]).skip(opts[:skip]||opts[:offset])

  docs = connection.select_all(ds.project('*'))
  docs = select_last_revs(docs)
  docs = docs.collect { |d| Rufus::Json.decode(d['doc']) }

  if keys && keys.first.is_a?(Regexp)
    docs.select { |doc| keys.find { |k| k.match(doc['_id']) } }
  else
    docs
  end

  # (pass on the dataset.filter(:wfid => /regexp/) for now
  # since we have potentially multiple keys)
end
ids(type) click to toggle source

Returns all the ids of the documents of a given type.

# File lib/ruote/ar/storage.rb, line 167
def ids(type)
  connection.select_values(table.where(table[:typ].eq(type)).project('distinct ide').order(table[:ide]))
end
purge!() click to toggle source

Nukes all the documents in this storage.

# File lib/ruote/ar/storage.rb, line 173
def purge!
  # just for test
end
purge_type!(type) click to toggle source

Nukes a db type and reputs it (losing all the documents that were in it).

# File lib/ruote/ar/storage.rb, line 198
def purge_type!(type)
  # just for test
end
put(doc, opts={}) click to toggle source
# File lib/ruote/ar/storage.rb, line 72
def put(doc, opts={})

  if doc['_rev']

    d = get(doc['type'], doc['_id'])

    return true unless d
    return d if d['_rev'] != doc['_rev']
      # failures
  end

  nrev = doc['_rev'].to_i + 1

  begin

    do_insert(doc, nrev, opts[:update_rev])

  rescue Exception => de
    puts "Error putting: #{de.message}: #{doc.inspect}"
    return (get(doc['type'], doc['_id']) || true)
      # failure
  end
  
  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].lt(nrev)))
  connection.delete(dm)

  nil
    # success
end
put_msg(action, options) click to toggle source
# File lib/ruote/ar/storage.rb, line 25
def put_msg(action, options)

  # put_msg is a unique action, no need for all the complexity of put
  do_insert(prepare_msg_doc(action, options), 1)

  nil
end
put_schedule(flavour, owner_fei, s, msg) click to toggle source
# File lib/ruote/ar/storage.rb, line 59
def put_schedule(flavour, owner_fei, s, msg)

  # put_schedule is a unique action, no need for all the complexity of put

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  do_insert(doc, 1)

  doc['_id']
end
query_workitems(criteria) click to toggle source
# File lib/ruote/ar/storage.rb, line 236
def query_workitems(criteria)

  ds = table[:typ].eq('workitems')
  
  wfid = criteria.delete('wfid')
  ds = ds.and(table[:ide].matches("%!#{wfid}")) if wfid
  
  pname = criteria.delete('participant_name') || criteria.delete('participant')
  ds = ds.and(table[:participant_name].eq(pname)) if pname

  count = criteria.delete('count')
  limit = criteria.delete('limit')
  offset = criteria.delete('offset') || criteria.delete('skip')

  criteria.collect do |k, v|
    ds = ds.and(table[:doc].matches("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
  end
  
  ds = table.where(ds).take(limit).skip(offset)

  return connection.select_one(ds.project(table[:wfid].count)).first if count
  
  select_last_revs(connection.select_all(ds.project('*'))).collect { |d| Ruote::Workitem.from_json(d['doc']) } 
end
reserve(doc) click to toggle source

Used to reserve ‘msgs’ and ‘schedules’. Simply update and return true if the update was affected more than one line.

# File lib/ruote/ar/storage.rb, line 36
def reserve(doc)
  um = Arel::UpdateManager.new Arel::Table.engine
  um.table table
  um.where table[:typ].eq(doc['type'].to_s).
    and(table[:ide].eq(doc['_id'].to_s).
        and(table[:rev].eq(1).
            and(table[:worker].eq(nil))))
  um.set [
    [table[:worker], @worker]
  ]
  return connection.update(um.to_sql) > 0
end
shutdown() click to toggle source

Returns connection to pool

# File lib/ruote/ar/storage.rb, line 178
def shutdown
  ::ActiveRecord::Base.clear_active_connections!
  ::ActiveRecord::Base.connection.close
end

Protected Instance Methods

decode_doc(doc) click to toggle source
# File lib/ruote/ar/storage.rb, line 263
def decode_doc(doc)

  return nil if doc.nil?

  doc = doc['doc']

  Rufus::Json.decode(doc)
end
do_get(type, key) click to toggle source
# File lib/ruote/ar/storage.rb, line 298
def do_get(type, key)
  decode_doc connection.select_one(table.project('*').
                                   where(table[:typ].eq(type).and(table[:ide].eq(key))).
                                   order(table[:rev].desc))
end
do_insert(doc, rev, update_rev=false) click to toggle source
# File lib/ruote/ar/storage.rb, line 273
def do_insert(doc, rev, update_rev=false)

  doc = doc.send(
    update_rev ? :merge! : :merge,
    {'_rev' => rev, 'put_at' => Ruote.now_to_utc_s}
  )
  
  m = Arel::InsertManager.new(Arel::Table.engine)
  m.into table
  m.insert [  
    [table[:ide], (doc['_id'] || '')],
    [table[:rev], (rev || '')],
    [table[:typ], (doc['type'] || '')],
    [table[:doc], (Rufus::Json.encode(doc) || '')],
    [table[:wfid], (extract_wfid(doc) || '')],
    [table[:participant_name], (doc['participant_name'] || '')]]

    connection.insert(m)
end
extract_wfid(doc) click to toggle source
# File lib/ruote/ar/storage.rb, line 294
def extract_wfid(doc)
  doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil)
end
select_last_revs(docs) click to toggle source

Weed out older docs (same ide, smaller rev).

This could all have been done via SQL, but those inconsistencies are rare, the cost of the pumped SQL is not constant :-(

# File lib/ruote/ar/storage.rb, line 321
def select_last_revs(docs)
  docs.each_with_object([]) { |doc,a|
    a << doc if a.last.nil? || doc['ide'] != a.last['ide']
  }
end

Private Instance Methods

connection() click to toggle source
# File lib/ruote/ar/storage.rb, line 332
def connection
  ::ActiveRecord::Base.connection
end
current_worker_name() click to toggle source
# File lib/ruote/ar/storage.rb, line 336
def current_worker_name
  worker = Thread.current['ruote_worker']
  if worker
    worker.name
  end || "worker"
end
table() click to toggle source
# File lib/ruote/ar/storage.rb, line 328
def table
  @table ||= ::Arel::Table.new @table_name
end