class LogStash::Outputs::Cassandra::EventParser
Responsible for accepting events from the pipeline and returning actions for the SafeSubmitter
Public Class Methods
new(options)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 8 def initialize(options) @logger = options['logger'] @table = options['table'] @filter_transform_event_key = options['filter_transform_event_key'] assert_filter_transform_structure(options['filter_transform']) if options['filter_transform'] @filter_transform = options['filter_transform'] @hints = options['hints'] @ignore_bad_values = options['ignore_bad_values'] end
Public Instance Methods
parse(event)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 18 def parse(event) action = {} begin action['table'] = event.sprintf(@table) filter_transform = get_filter_transform(event) if filter_transform action['data'] = {} filter_transform.each { |filter| add_event_value_from_filter_to_action(event, filter, action) } else add_event_data_using_configured_hints(event, action) end @logger.debug('event parsed to action', :action => action) rescue Exception => e @logger.error('failed parsing event', :event => event, :error => e) action = nil end action end
Private Instance Methods
add_event_data_using_configured_hints(event, action)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 72 def add_event_data_using_configured_hints(event, action) action_data = event.to_hash.reject { |key| %r{^@} =~ key } @hints.each do |event_key, cassandra_type| if action_data.has_key?(event_key) action_data[event_key] = convert_value_to_cassandra_type_or_default_if_configured(action_data[event_key], cassandra_type) end end action['data'] = action_data end
add_event_value_from_filter_to_action(event, filter, action)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 59 def add_event_value_from_filter_to_action(event, filter, action) event_data = event.sprintf(filter['event_key']) unless filter.fetch('expansion_only', false) event_data = event.get(event_data) end if filter.has_key?('cassandra_type') cassandra_type = event.sprintf(filter['cassandra_type']) event_data = convert_value_to_cassandra_type_or_default_if_configured(event_data, cassandra_type) end column_name = event.sprintf(filter['column_name']) action['data'][column_name] = event_data end
assert_filter_transform_structure(filter_transform)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 51 def assert_filter_transform_structure(filter_transform) filter_transform.each { |item| if !item.has_key?('event_key') || !item.has_key?('column_name') raise ArgumentError, "item is incorrectly configured in filter_transform:\nitem => #{item}\nfilter_transform => #{filter_transform}" end } end
convert_value_to_cassandra_type(event_data, cassandra_type)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 111 def convert_value_to_cassandra_type(event_data, cassandra_type) case cassandra_type when 'timestamp' converted_value = event_data if converted_value.is_a?(Numeric) converted_value = Time.at(converted_value) elsif converted_value.respond_to?(:to_s) converted_value = Time::parse(event_data.to_s) end return ::Cassandra::Types::Timestamp.new(converted_value) when 'inet' return ::Cassandra::Types::Inet.new(event_data) when 'float' return ::Cassandra::Types::Float.new(event_data) when 'text' return ::Cassandra::Types::Text.new(event_data) when 'blob' return ::Cassandra::Types::Blob.new(event_data) when 'ascii' return ::Cassandra::Types::Ascii.new(event_data) when 'bigint' return ::Cassandra::Types::Bigint.new(event_data) when 'counter' return ::Cassandra::Types::Counter.new(event_data) when 'int' return ::Cassandra::Types::Int.new(event_data) when 'varint' return ::Cassandra::Types::Varint.new(event_data) when 'boolean' return ::Cassandra::Types::Boolean.new(event_data) when 'decimal' return ::Cassandra::Types::Decimal.new(event_data) when 'double' return ::Cassandra::Types::Double.new(event_data) when 'timeuuid' return ::Cassandra::Types::Timeuuid.new(event_data) when /^set<(.*)>$/ # convert each value # then add all to an array and convert to set converted_items = ::Set.new set_type = $1 event_data.each { |item| converted_item = convert_value_to_cassandra_type(item, set_type) converted_items.add(converted_item) } return converted_items else raise "Unknown cassandra_type #{name}" end end
convert_value_to_cassandra_type_or_default_if_configured(event_data, cassandra_type)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 83 def convert_value_to_cassandra_type_or_default_if_configured(event_data, cassandra_type) typed_event_data = nil begin typed_event_data = convert_value_to_cassandra_type(event_data, cassandra_type) rescue Exception => e error_message = "Cannot convert `value (`#{event_data}`) to `#{cassandra_type}` type" if @ignore_bad_values case cassandra_type when 'float', 'int', 'varint', 'bigint', 'double', 'counter', 'timestamp' typed_event_data = convert_value_to_cassandra_type(0, cassandra_type) when 'timeuuid' typed_event_data = convert_value_to_cassandra_type('00000000-0000-0000-0000-000000000000', cassandra_type) when 'inet' typed_event_data = convert_value_to_cassandra_type('0.0.0.0', cassandra_type) when /^set<.*>$/ typed_event_data = convert_value_to_cassandra_type([], cassandra_type) else raise ArgumentError, "unable to provide a default value for type #{event_data}" end @logger.warn(error_message, :exception => e, :backtrace => e.backtrace) else @logger.error(error_message, :exception => e, :backtrace => e.backtrace) raise error_message end end typed_event_data end
get_filter_transform(event)
click to toggle source
# File lib/logstash/outputs/cassandra/event_parser.rb, line 40 def get_filter_transform(event) filter_transform = nil if @filter_transform_event_key filter_transform = event.get(@filter_transform_event_key) assert_filter_transform_structure(filter_transform) elsif @filter_transform.length > 0 filter_transform = @filter_transform end filter_transform end