class OFlow::Actors::Recorder

Actor that saves records to the local file system as JSON representations of the records as lines in a single file associated with one of the elements of the JSON record. The message that triggers the store must have a 'table' element, a 'key', and a 'rec' element.

Attributes

dir[R]

Public Class Methods

new(task, options) click to toggle source

Initializes the recorder with options of: @param [Hash] options with keys of

- :dir [String] directory to store the persisted records
- :results_path [String] path to where the results should be placed in
                         the request (default: nil or ship only results)
Calls superclass method OFlow::Actor::new
# File lib/oflow/actors/recorder.rb, line 19
def initialize(task, options)
  super
  @cache = {}
  @dir = options[:dir]
  if @dir.nil?
    @dir = File.join('db', task.full_name.gsub(':', '/'))
  end
  @dir = File.expand_path(@dir.strip)
  @results_path = options[:results_path]
  @results_path.strip! unless @results_path.nil?

  if Dir.exist?(@dir)
    Dir.glob(File.join(@dir, '*.json')).each do |path|
      load(path)
    end
  else
    `mkdir -p #{@dir}`
  end
end

Public Instance Methods

clear(box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 129
def clear(box)
  @cache = {}
  `rm -rf #{@dir}`
  # remake the dir in preparation for future inserts
  `mkdir -p #{@dir}`
  nil
end
delete(box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 102
def delete(box)
  table = box.get('table')
  key = box.get('key')
  raise KeyError.new(:read) if table.nil?
  raise KeyError.new(:read) if key.nil?

  tc = @cache[table]
  unless tc.nil?
    tc.delete(key)
    write(table)
  end
  nil
end
insert(box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 70
def insert(box)
  table = box.get('table')
  key = box.get('key')
  rec = box.get('rec')
  raise KeyError.new(:insert) if table.nil?
  raise KeyError.new(:insert) if key.nil?

  tc = @cache[table]
  if tc.nil?
    tc = {}
    @cache[table] = tc
  end
  tc[key] = rec
  write(table)
end
Also aliased as: update, insert_update
insert_update(box)
Alias for: insert
perform(op, box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 39
def perform(op, box)
  dest = box.contents[:dest]
  result = nil
  case op
  when :insert, :create
    result = insert(box)
  when :get, :read
    result = read(box)
  when :update
    result = update(box)
  when :insert_update
    result = insert_update(box)
  when :delete, :remove
    result = delete(box)
  when :query
    result = query(box)
  when :clear
    result = clear(box)
  else
    raise OpError.new(task.full_name, op)
  end
  unless dest.nil?
    if @results_path.nil?
      box = Box.new(result, box.tracker)
    else
      box = box.set(@results_path, result)
    end
    task.ship(dest, box)
  end
end
query(box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 116
def query(box)
  recs = {}
  expr = box.get('expr')
  table = box.get('table')
  raise KeyError.new(:query) if table.nil?

  tc = @cache[table]
  tc.each do |key,rec|
    recs[key] = rec if (expr.nil? || expr.call(rec, key))
  end
  recs
end
read(box) click to toggle source
# File lib/oflow/actors/recorder.rb, line 89
def read(box)
  table = box.get('table')
  key = box.get('key')
  raise KeyError.new(:read) if table.nil?
  raise KeyError.new(:read) if key.nil?

  tc = @cache[table]
  return nil if tc.nil?

  rec = tc[key]
  rec
end
update(box)
Alias for: insert

Private Instance Methods

load(path) click to toggle source
# File lib/oflow/actors/recorder.rb, line 145
def load(path)
  return nil unless File.exist?(path)
  tc = Oj.load_file(path, :mode => :strict, symbol_keys: true)
  name = File.basename(path)[0..-File.extname(path).size - 1]
  @cache[name] = tc
end
write(table) click to toggle source
# File lib/oflow/actors/recorder.rb, line 139
def write(table)
  filename = "#{table}.json"
  path = File.join(@dir, filename)
  Oj.to_file(path, @cache[table], :mode => :strict)
end