class Fluent::Plugin::AvroParser

Constants

MAGIC_BYTE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/parser_avro.rb, line 48
def configure(conf)
  super

  if (!@writers_schema_file.nil? || !@writers_schema_json.nil?) &&
     (!@readers_schema_file.nil? || !@readers_schema_json.nil?)
    unless [@writers_schema_json, @writers_schema_file].compact.size == 1
      raise Fluent::ConfigError, "writers_schema_json, writers_schema_file is required, but they cannot specify at the same time!"
    end
    unless [@readers_schema_json, @readers_schema_file].compact.size == 1
      raise Fluent::ConfigError, "readers_schema_json, readers_schema_file is required, but they cannot specify at the same time!"
    end

    @writers_raw_schema = if @writers_schema_file
                            File.read(@writers_schema_file)
                          elsif @writers_schema_json
                            @writers_schema_json
                          end
    @readers_raw_schema = if @readers_schema_file
                            File.read(@readers_schema_file)
                          elsif @readers_schema_json
                            @readers_schema_json
                          end

    @writers_schema = Avro::Schema.parse(@writers_raw_schema)
    @readers_schema = Avro::Schema.parse(@readers_raw_schema)
    @reader = Avro::IO::DatumReader.new(@writers_schema, @readers_schema)
  elsif @avro_registry
    @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url, @api_key, @api_secret)
    @raw_schema = @confluent_registry.subject_version(@avro_registry.subject,
                                                      @avro_registry.schema_key,
                                                      @avro_registry.schema_version)
    @schema = Avro::Schema.parse(@raw_schema)
    @reader = Avro::IO::DatumReader.new(@schema)
  else
    unless [@schema_json, @schema_file, @schema_url].compact.size == 1
      raise Fluent::ConfigError, "schema_json, schema_file, or schema_url is required, but they cannot specify at the same time!"
    end

    @raw_schema = if @schema_file
                    File.read(@schema_file)
                  elsif @schema_url
                    fetch_schema(@schema_url, @schema_url_key)
                  elsif @schema_json
                    @schema_json
                  end

    @schema = Avro::Schema.parse(@raw_schema)
    @reader = Avro::IO::DatumReader.new(@schema)
  end
end
fetch_schema(url, schema_key) click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 165
def fetch_schema(url, schema_key)
  uri = URI.parse(url)
  response = Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
    request = Net::HTTP::Get.new(uri.path)
    if @api_key and @api_secret
      request.basic_auth(@api_key, @api_secret)
    end
    http.request(request)
  end
  if schema_key.nil?
    response.body
  else
    Yajl.load(response.body)[schema_key]
  end
end
parse(data) { |time, record| ... } click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 103
def parse(data)
  buffer = StringIO.new(data)
  decoder = Avro::IO::BinaryDecoder.new(buffer)
  begin
    if @use_confluent_schema || @avro_registry
      # When using confluent avro schema, record is formatted as follows:
      #
      # MAGIC_BYTE | schema_id | record
      # ----------:|:---------:|:---------------
      #  1byte     |  4bytes   | record contents
      magic_byte = decoder.read(1)

      if magic_byte != MAGIC_BYTE
        raise "The first byte should be magic byte but got {magic_byte.inspect}"
      end
      schema_id = decoder.read(4).unpack("N").first
    end
    decoded_data = @reader.read(decoder)
    time, record = convert_values(parse_time(decoded_data), decoded_data)
    yield time, record
  rescue EOFError, RuntimeError => e
    raise e unless [@schema_url, @avro_registry].compact.size == 1
    begin
      new_raw_schema = if @schema_url
                         fetch_schema(@schema_url, @schema_url_key)
                       elsif @avro_registry
                         @confluent_registry.schema_with_id(schema_id,
                                                            @avro_registry.schema_key)
                       end
      new_schema = Avro::Schema.parse(new_raw_schema)
      is_changed = (new_raw_schema != @raw_schema)
      @raw_schema = new_raw_schema
      @schema = new_schema
    rescue EOFError, RuntimeError
      # Do nothing.
    end
    if is_changed
      buffer = StringIO.new(data)
      decoder = Avro::IO::BinaryDecoder.new(buffer)
      if @use_confluent_schema || @avro_registry
        # When using confluent avro schema, record is formatted as follows:
        #
        # MAGIC_BYTE | schema_id | record
        # ----------:|:---------:|:---------------
        #  1byte     |  4bytes   | record contents
        magic_byte = decoder.read(1)

        if magic_byte != MAGIC_BYTE
          raise "The first byte should be magic byte but got {magic_byte.inspect}"
        end
        schema_id = decoder.read(4).unpack("N").first
      end
      @reader = Avro::IO::DatumReader.new(@schema)
      decoded_data = @reader.read(decoder)
      time, record = convert_values(parse_time(decoded_data), decoded_data)
      yield time, record
    else
      raise e
    end
  end
end
parser_type() click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 99
def parser_type
  :binary
end