class KeenCli::BatchProcessor
Constants
- DEFAULT_BATCH_SIZE
defaults
- DEFAULT_CSV_OPTIONS
Attributes
batch_size[RW]
collection[RW]
public options, set in constructor
csv[RW]
csv_options[RW]
events[RW]
params[RW]
pretty[RW]
silent[RW]
size[RW]
internal state tracking
total_size[RW]
Public Class Methods
new(collection, options={})
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 23 def initialize(collection, options={}) self.collection = collection self.batch_size = options[:'batch-size'] || DEFAULT_BATCH_SIZE self.csv = options[:csv] self.params = options[:params] self.csv_options = DEFAULT_CSV_OPTIONS.merge(options[:csv_options] || {}) self.events = [] self.pretty = options[:pretty] self.silent = options[:silent] self.total_size = 0 self.reset end
Public Instance Methods
add(line)
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 36 def add(line) # if we're in CSV mode and don't have headers let's try and set them if self.csv && !self.csv_options.has_key?(:headers) set_headers(line) return end if self.csv csv_row = CSV.parse_line(line, self.csv_options) raise "Could not parse! #{line}" unless csv_row csv_event = row_to_hash(csv_row) add_event_and_flush_if_necessary(csv_event) elsif self.params querystring_event = Utils.parse_data_as_querystring(line) add_event_and_flush_if_necessary(querystring_event) else json_object = JSON.parse(line) # if it's an array, lets iterate over and flush as necessary if json_object.is_a?(Array) json_object.each do |json_event| add_event_and_flush_if_necessary(json_event) end else add_event_and_flush_if_necessary(json_object) end end end
flush()
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 76 def flush publish_batch(self.collection, self.events) if self.size > 0 reset end
reset()
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 81 def reset self.size = 0 self.events.clear end
Private Instance Methods
add_event_and_flush_if_necessary(event)
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 88 def add_event_and_flush_if_necessary(event) # remove keen.id and keen.created_at if keen = event["keen"] keen.delete("id") keen.delete("created_at") end # Set the keen.location.coordinates to a true array if event["keen"] and event["keen"]["location"] and event["keen"]["location"]["coordinates"] coordinates = event["keen"]["location"]["coordinates"] if coordinates and coordinates[0] == "[" and coordinates[coordinates.length-1] == "]" event["keen"]["location"]["coordinates"] = coordinates[1..coordinates.length-2].split(',').map(&:to_f) elsif coordinates.class != Array # This should never happen, but just in case.. event["keen"].delete("location") end end self.events.push(event) self.size += 1 self.total_size += 1 self.flush if self.size >= self.batch_size end
publish_batch(collection, events)
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 118 def publish_batch(collection, events) batches = {} batches[collection] = events Keen.publish_batch(batches).tap do |result| Utils.out_json(result, :pretty => self.pretty, :silent => self.silent) end end
row_to_hash(csv_row)
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 126 def row_to_hash(csv_row) naive_hash = csv_row.to_hash naive_hash.map do |main_key, main_value| main_key.to_s.split('.').reverse.inject(main_value) do |value, key| {key => value} end end.inject(&:deep_merge) end
set_headers(line)
click to toggle source
# File lib/keen-cli/batch_processor.rb, line 113 def set_headers(line) csv_row = CSV.parse_line(line, self.csv_options) self.csv_options[:headers] = csv_row.to_a end