class OFlow::Actors::Persister
Actor
that persists records to the local file system as JSON representations of the records. Records can be the whole contents of the box received or a sub element of the contents. The key to the records are keys provided either in the record data or outside the data but somewhere else in the box received. Options for maintaining historic records and sequence number locking are included. If no sequence number is provide the Persister
will assume there is no checking required and write anyway.
Records are stored as JSON with the filename as the key and sequence number. The format of the file name is <key>~<seq>.json. As an example, a record stored with a key of 'first' and a sequence number of 3 (third time saved) would be 'first~3.json.
Attributes
Public Class Methods
Initializes the persister with options of: @param [Hash] options with keys of
- :dir [String] directory to store the persisted records - :key_data [String] path to record data (default: nil (all)) - :key_path [String] path to key for the record (default: 'key') - :seq_path [String] path to sequence for the record (default: 'seq') - :results_path [String] path to where the results should be placed in the request (default: nil or ship only results) - :cache [Boolean] if true, cache records in memory - :historic [Boolean] if true, do not delete previous versions
OFlow::Actor::new
# File lib/oflow/actors/persister.rb, line 36 def initialize(task, options) super @dir = options[:dir] if @dir.nil? @dir = File.join('db', task.full_name.gsub(':', '/')) end @dir = File.expand_path(@dir.strip) @key_path = options.fetch(:key_path, 'key').strip @seq_path = options.fetch(:seq_path, 'seq').strip @data_path = options.fetch(:data_path, nil) # nil means all contents @data_path.strip! unless @data_path.nil? @results_path = options[:results_path] @results_path.strip! unless @results_path.nil? if options.fetch(:cache, true) # key is record key, value is [seq, rec] @cache = {} else @cache = nil end @historic = ('true' == options.fetch(:historic, 'false').to_s) if Dir.exist?(@dir) unless @cache.nil? Dir.glob(File.join(@dir, '**', '*.json')).each do |path| if File.symlink?(path) rec = load(path) unless @cache.nil? key, seq = key_seq_from_path(path) @cache[key] = [seq, rec] end end end end else `mkdir -p #{@dir}` end end
Public Instance Methods
Returns true if the actor is caching records.
# File lib/oflow/actors/persister.rb, line 116 def caching?() !@cache.nil? end
# File lib/oflow/actors/persister.rb, line 220 def clear(box) @cache = {} unless @cache.nil? `rm -rf #{@dir}` # remake the dir in preparation for future inserts `mkdir -p #{@dir}` nil end
# File lib/oflow/actors/persister.rb, line 177 def delete(box) key = box.get(@key_path) @cache.delete(key) unless @cache.nil? linkpath = File.join(@dir, "#{key}.json") File.delete(linkpath) delete_historic(key, nil) unless @historic nil end
# File lib/oflow/actors/persister.rb, line 249 def delete_historic(key, seq) Dir.glob(File.join(@dir, '**', "#{key}~*.json")).each do |path| _, s = key_seq_from_path(path) next if s == seq File.delete(path) end end
# File lib/oflow/actors/persister.rb, line 106 def insert(box) key = box.get(@key_path) raise KeyError.new(:insert) if key.nil? box = box.set(@seq_path, 1) rec = box.get(@data_path) @cache[key] = [1, rec] unless @cache.nil? save(rec, key, 1) end
# File lib/oflow/actors/persister.rb, line 169 def insert_update(box) begin insert(box) rescue ExistsError update(box) end end
# File lib/oflow/actors/persister.rb, line 257 def key_seq_from_path(path) path = File.readlink(path) if File.symlink?(path) base = File.basename(path)[0..-6] # strip off '.json' a = base.split('~') [a[0..-2].join('~'), a[-1].to_i] end
# File lib/oflow/actors/persister.rb, line 244 def load(path) return nil unless File.exist?(path) Oj.load_file(path, :mode => :object) end
# File lib/oflow/actors/persister.rb, line 75 def perform(op, box) dest = box.contents[:dest] result = nil case op when :insert, :create result = insert(box) when :get, :read result = read(box) when :update result = update(box) when :insert_update result = insert_update(box) when :delete, :remove result = delete(box) when :query result = query(box) when :clear result = clear(box) else raise OpError.new(task.full_name, op) end unless dest.nil? if @results_path.nil? box = Box.new(result, box.tracker) else box = box.set(@results_path, result) end task.ship(dest, box) end end
# File lib/oflow/actors/persister.rb, line 186 def query(box) recs = {} expr = box.get('expr') if expr.nil? if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| recs[File.basename(path)[0..-6]] = load(path) if File.symlink?(path) end else @cache.each do |key,seq_rec| recs[key] = seq_rec[1] end end elsif expr.is_a?(Proc) if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| next unless File.symlink?(path) rec = load(path) key, seq = key_seq_from_path(path) recs[key] = rec if expr.call(rec, key, seq) end else @cache.each do |key,seq_rec| rec = seq_rec[1] recs[key] = rec if expr.call(rec, key, seq_rec[0]) end end else # TBD add support for string safe expressions in the future raise Exception.new("expr can only be a Proc, not a #{expr.class}") end recs end
# File lib/oflow/actors/persister.rb, line 120 def read(box) # Should be a Hash. key = box.contents[:key] raise KeyError(:read) if key.nil? if @cache.nil? linkpath = File.join(@dir, "#{key}.json") rec = load(linkpath) else unless (seq_rec = @cache[key]).nil? rec = seq_rec[1] end end # If not found rec will be nil, that is okay. rec end
internal use only
# File lib/oflow/actors/persister.rb, line 229 def save(rec, key, seq) filename = "#{key}~#{seq}.json" path = File.join(@dir, filename) linkpath = File.join(@dir, "#{key}.json") raise ExistsError.new(key, seq) if File.exist?(path) Oj.to_file(path, rec, :mode => :object) begin File.delete(linkpath) rescue Exception # ignore end File.symlink(filename, linkpath) rec end
# File lib/oflow/actors/persister.rb, line 136 def update(box) key = box.get(@key_path) raise KeyError.new(:update) if key.nil? seq = box.get(@seq_path) if @cache.nil? if (seq_rec = @cache[key]).nil? raise NotFoundError.new(key) end seq = seq_rec[0] if seq.nil? else seq = 0 has_rec = false Dir.glob(File.join(@dir, '**', "#{key}*.json")).each do |path| if File.symlink?(path) has_rec = true next end _, s = key_seq_from_path(path) seq = s if seq < s end end raise NotFoundError.new(key) unless has_rec raise SeqError.new(:update, key) if seq.nil? || 0 == seq seq += 1 box = box.set(@seq_path, seq) rec = box.get(@data_path) @cache[key] = [seq, rec] unless @cache.nil? rec = save(rec, key, seq) delete_historic(key, seq) unless @historic rec end