class LogStash::Codecs::KafkaTimeMachine
Public Instance Methods
decode(data)
click to toggle source
# File lib/logstash/codecs/kafkatimemachine.rb, line 39 def decode(data) raise "Not implemented" end
encode(event)
click to toggle source
# File lib/logstash/codecs/kafkatimemachine.rb, line 44 def encode(event) # Extract producer data and check for validity; note that kafka_datacenter_producer is used for both producer and aggregate arrays kafka_datacenter_producer = event.get("[@metadata][kafka_datacenter_producer]") kafka_topic_producer = event.get("[@metadata][kafka_topic_producer]") kafka_consumer_group_producer = event.get("[@metadata][kafka_consumer_group_producer]") kafka_append_time_producer = Float(event.get("[@metadata][kafka_append_time_producer]")) rescue nil logstash_kafka_read_time_producer = Float(event.get("[@metadata][logstash_kafka_read_time_producer]")) rescue nil kafka_producer_array = Array[kafka_datacenter_producer, kafka_topic_producer, kafka_consumer_group_producer, kafka_append_time_producer, logstash_kafka_read_time_producer] @logger.debug("kafka_producer_array: #{kafka_producer_array}") if (kafka_producer_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_producer_array invalid: Found null") error_string_producer = "Error in producer data: #{kafka_producer_array}" producer_valid = false else @logger.debug("kafka_producer_array valid") producer_valid = true logstash_kafka_read_time_producer = logstash_kafka_read_time_producer.to_i kafka_append_time_producer = kafka_append_time_producer.to_i kafka_producer_lag_ms = logstash_kafka_read_time_producer - kafka_append_time_producer end # Extract aggregate data and check for validity kafka_topic_aggregate = event.get("[@metadata][kafka_topic_aggregate]") kafka_consumer_group_aggregate = event.get("[@metadata][kafka_consumer_group_aggregate]") kafka_append_time_aggregate = Float(event.get("[@metadata][kafka_append_time_aggregate]")) rescue nil logstash_kafka_read_time_aggregate = Float(event.get("[@metadata][logstash_kafka_read_time_aggregate]")) rescue nil kafka_aggregate_array = Array[kafka_datacenter_producer, kafka_topic_aggregate, kafka_consumer_group_aggregate, kafka_append_time_aggregate, logstash_kafka_read_time_aggregate] @logger.debug("kafka_aggregate_array: #{kafka_aggregate_array}") if (kafka_aggregate_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_aggregate_array invalid: Found null") error_string_aggregate = "Error in aggregate data: #{kafka_aggregate_array}" aggregate_valid = false else @logger.debug("kafka_aggregate_array valid") aggregate_valid = true logstash_kafka_read_time_aggregate = logstash_kafka_read_time_aggregate.to_i kafka_append_time_aggregate = kafka_append_time_aggregate.to_i kafka_aggregate_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_aggregate end # Get current time for influxdb timestamp kafka_logstash_influx_metric_time = (Time.now.to_f * (1000*1000*1000)).to_i if (producer_valid == true && aggregate_valid == true) kafka_total_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_producer influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=complete,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate},kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_total_lag_ms=#{kafka_total_lag_ms},kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms},kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}" elsif (producer_valid == true && aggregate_valid == false) influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=producer,kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}" elsif (aggregate_valid == true && producer_valid == false) influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=aggregate,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate} kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms} #{kafka_logstash_influx_metric_time}" elsif (aggregate_valid == false && producer_valid == false) @logger.error("Error kafkatimemachine: Could not build valid response --> #{error_string_producer}, #{error_string_aggregate}") influx_line_protocol = nil end if (!influx_line_protocol.nil? && @enable_log == true) file_output(influx_line_protocol) end @on_event.call(event, event.sprintf(influx_line_protocol)) end
file_output( output_line )
click to toggle source
# File lib/logstash/codecs/kafkatimemachine.rb, line 15 def file_output( output_line ) # Limit max file size to 5MB to protect integrity of host max_file_size = 5242880 # Open file and append until max size reached File.open("/tmp/kafkatimemachine.txt", "a") do |f| if (f.size <= max_file_size) f.puts(output_line) @write_end_of_log = true elsif (f.size > max_file_size && @write_end_of_log == true) f.puts("Maximum file size of #{max_file_size} bytes reached; delete /tmp/kafkatimemachine.txt to resume writing") @write_end_of_log = false end end end
register()
click to toggle source
# File lib/logstash/codecs/kafkatimemachine.rb, line 34 def register end