class OFlow::Actors::Recorder
Actor
that saves records to the local file system as JSON representations of the records as lines in a single file associated with one of the elements of the JSON record. The message that triggers the store must have a 'table' element, a 'key', and a 'rec' element.
Attributes
dir[R]
Public Class Methods
new(task, options)
click to toggle source
Initializes the recorder with options of: @param [Hash] options with keys of
- :dir [String] directory to store the persisted records - :results_path [String] path to where the results should be placed in the request (default: nil or ship only results)
Calls superclass method
OFlow::Actor::new
# File lib/oflow/actors/recorder.rb, line 19 def initialize(task, options) super @cache = {} @dir = options[:dir] if @dir.nil? @dir = File.join('db', task.full_name.gsub(':', '/')) end @dir = File.expand_path(@dir.strip) @results_path = options[:results_path] @results_path.strip! unless @results_path.nil? if Dir.exist?(@dir) Dir.glob(File.join(@dir, '*.json')).each do |path| load(path) end else `mkdir -p #{@dir}` end end
Public Instance Methods
clear(box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 129 def clear(box) @cache = {} `rm -rf #{@dir}` # remake the dir in preparation for future inserts `mkdir -p #{@dir}` nil end
delete(box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 102 def delete(box) table = box.get('table') key = box.get('key') raise KeyError.new(:read) if table.nil? raise KeyError.new(:read) if key.nil? tc = @cache[table] unless tc.nil? tc.delete(key) write(table) end nil end
insert(box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 70 def insert(box) table = box.get('table') key = box.get('key') rec = box.get('rec') raise KeyError.new(:insert) if table.nil? raise KeyError.new(:insert) if key.nil? tc = @cache[table] if tc.nil? tc = {} @cache[table] = tc end tc[key] = rec write(table) end
Also aliased as: update, insert_update
perform(op, box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 39 def perform(op, box) dest = box.contents[:dest] result = nil case op when :insert, :create result = insert(box) when :get, :read result = read(box) when :update result = update(box) when :insert_update result = insert_update(box) when :delete, :remove result = delete(box) when :query result = query(box) when :clear result = clear(box) else raise OpError.new(task.full_name, op) end unless dest.nil? if @results_path.nil? box = Box.new(result, box.tracker) else box = box.set(@results_path, result) end task.ship(dest, box) end end
query(box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 116 def query(box) recs = {} expr = box.get('expr') table = box.get('table') raise KeyError.new(:query) if table.nil? tc = @cache[table] tc.each do |key,rec| recs[key] = rec if (expr.nil? || expr.call(rec, key)) end recs end
read(box)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 89 def read(box) table = box.get('table') key = box.get('key') raise KeyError.new(:read) if table.nil? raise KeyError.new(:read) if key.nil? tc = @cache[table] return nil if tc.nil? rec = tc[key] rec end
Private Instance Methods
load(path)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 145 def load(path) return nil unless File.exist?(path) tc = Oj.load_file(path, :mode => :strict, symbol_keys: true) name = File.basename(path)[0..-File.extname(path).size - 1] @cache[name] = tc end
write(table)
click to toggle source
# File lib/oflow/actors/recorder.rb, line 139 def write(table) filename = "#{table}.json" path = File.join(@dir, filename) Oj.to_file(path, @cache[table], :mode => :strict) end