class LogStash::Codecs::AvroDataFile

Logstash Codec - Avro Data File

This plugin is used to process logstash events that represent Avro data files, like the S3 input can produce.

Options

Usage

input {

stdin { codec => 'avro-data-file' }

}

Attributes

tempfile[RW]
temporary_directory[R]

Public Instance Methods

decode(data) click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 40
def decode(data)
  merge(data)
end
encode(_event) click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 63
def encode(_event)
  raise 'Not implemented'
end
flush() { |event| ... } click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 44
def flush
  tempfile.flush
  return unless block_given?

  Avro::DataFile.open(tempfile.path, 'r') do |reader|
    schema = reader.datum_reader.writers_schema
    reader.each do |avro_message|
      event = LogStash::Event.new(avro_message)
      decorate_event(event, schema) if decorate_events?
      yield event
    end
  end
rescue => e
  puts e
  @logger.error('Avro parse error', error: e)
ensure
  reset
end
register() click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 34
def register
  require 'fileutils'
  FileUtils.mkdir_p(temporary_directory) unless Dir.exist?(temporary_directory)
  reset
end

Private Instance Methods

decorate_event(event, schema) click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 91
def decorate_event(event, schema)
  event.set('[@metadata][avro][type]', schema.type)
  if schema.is_a?(Avro::Schema::NamedSchema)
    event.set('[@metadata][avro][name]', schema.name)
    event.set('[@metadata][avro][namespace]', schema.namespace)
  end
end
decorate_events?() click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 87
def decorate_events?
  @decorate_events
end
merge(bytes) click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 72
def merge(bytes)
  tempfile.write(bytes)
end
reset() click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 76
def reset
  unless tempfile.nil?
    begin
      File.unlink(tempfile.path)
      tempfile.close
    rescue Errno::ENOENT # rubocop:disable Lint/HandleExceptions
    end
  end
  self.tempfile = Tempfile.create('', temporary_directory)
end