class KeenCli::BatchProcessor

Constants

DEFAULT_BATCH_SIZE

defaults

DEFAULT_CSV_OPTIONS

Attributes

batch_size[RW]
collection[RW]

public options, set in constructor

csv[RW]
csv_options[RW]
events[RW]
params[RW]
pretty[RW]
silent[RW]
size[RW]

internal state tracking

total_size[RW]

Public Class Methods

new(collection, options={}) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 23
def initialize(collection, options={})
  self.collection = collection
  self.batch_size = options[:'batch-size'] || DEFAULT_BATCH_SIZE
  self.csv = options[:csv]
  self.params = options[:params]
  self.csv_options = DEFAULT_CSV_OPTIONS.merge(options[:csv_options] || {})
  self.events = []
  self.pretty = options[:pretty]
  self.silent = options[:silent]
  self.total_size = 0
  self.reset
end

Public Instance Methods

add(line) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 36
def add(line)

  # if we're in CSV mode and don't have headers let's try and set them
  if self.csv && !self.csv_options.has_key?(:headers)

    set_headers(line)
    return

  end

  if self.csv

    csv_row = CSV.parse_line(line, self.csv_options)
    raise "Could not parse! #{line}" unless csv_row

    csv_event = row_to_hash(csv_row)
    add_event_and_flush_if_necessary(csv_event)

  elsif self.params

    querystring_event = Utils.parse_data_as_querystring(line)
    add_event_and_flush_if_necessary(querystring_event)

  else

    json_object = JSON.parse(line)

    # if it's an array, lets iterate over and flush as necessary
    if json_object.is_a?(Array)
      json_object.each do |json_event|
        add_event_and_flush_if_necessary(json_event)
      end
    else
      add_event_and_flush_if_necessary(json_object)
    end

  end

end
flush() click to toggle source
# File lib/keen-cli/batch_processor.rb, line 76
def flush
  publish_batch(self.collection, self.events) if self.size > 0
  reset
end
reset() click to toggle source
# File lib/keen-cli/batch_processor.rb, line 81
def reset
  self.size = 0
  self.events.clear
end

Private Instance Methods

add_event_and_flush_if_necessary(event) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 88
def add_event_and_flush_if_necessary(event)

  # remove keen.id and keen.created_at
  if keen = event["keen"]
    keen.delete("id")
    keen.delete("created_at")
  end

  # Set the keen.location.coordinates to a true array
  if event["keen"] and event["keen"]["location"] and event["keen"]["location"]["coordinates"]
    coordinates = event["keen"]["location"]["coordinates"]

    if coordinates and coordinates[0] == "[" and coordinates[coordinates.length-1] == "]"
      event["keen"]["location"]["coordinates"] = coordinates[1..coordinates.length-2].split(',').map(&:to_f)
    elsif coordinates.class != Array # This should never happen, but just in case..
      event["keen"].delete("location")
    end
  end

  self.events.push(event)
  self.size += 1
  self.total_size += 1
  self.flush if self.size >= self.batch_size
end
publish_batch(collection, events) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 118
def publish_batch(collection, events)
  batches = {}
  batches[collection] = events
  Keen.publish_batch(batches).tap do |result|
    Utils.out_json(result, :pretty => self.pretty, :silent => self.silent)
  end
end
row_to_hash(csv_row) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 126
def row_to_hash(csv_row)
  naive_hash = csv_row.to_hash
  naive_hash.map do |main_key, main_value|
    main_key.to_s.split('.').reverse.inject(main_value) do |value, key|
      {key => value}
    end
  end.inject(&:deep_merge)
end
set_headers(line) click to toggle source
# File lib/keen-cli/batch_processor.rb, line 113
def set_headers(line)
  csv_row = CSV.parse_line(line, self.csv_options)
  self.csv_options[:headers] = csv_row.to_a
end