class Fluent::Plugin::AvroParser
Constants
- MAGIC_BYTE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/parser_avro.rb, line 48 def configure(conf) super if (!@writers_schema_file.nil? || !@writers_schema_json.nil?) && (!@readers_schema_file.nil? || !@readers_schema_json.nil?) unless [@writers_schema_json, @writers_schema_file].compact.size == 1 raise Fluent::ConfigError, "writers_schema_json, writers_schema_file is required, but they cannot specify at the same time!" end unless [@readers_schema_json, @readers_schema_file].compact.size == 1 raise Fluent::ConfigError, "readers_schema_json, readers_schema_file is required, but they cannot specify at the same time!" end @writers_raw_schema = if @writers_schema_file File.read(@writers_schema_file) elsif @writers_schema_json @writers_schema_json end @readers_raw_schema = if @readers_schema_file File.read(@readers_schema_file) elsif @readers_schema_json @readers_schema_json end @writers_schema = Avro::Schema.parse(@writers_raw_schema) @readers_schema = Avro::Schema.parse(@readers_raw_schema) @reader = Avro::IO::DatumReader.new(@writers_schema, @readers_schema) elsif @avro_registry @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url, @api_key, @api_secret) @raw_schema = @confluent_registry.subject_version(@avro_registry.subject, @avro_registry.schema_key, @avro_registry.schema_version) @schema = Avro::Schema.parse(@raw_schema) @reader = Avro::IO::DatumReader.new(@schema) else unless [@schema_json, @schema_file, @schema_url].compact.size == 1 raise Fluent::ConfigError, "schema_json, schema_file, or schema_url is required, but they cannot specify at the same time!" end @raw_schema = if @schema_file File.read(@schema_file) elsif @schema_url fetch_schema(@schema_url, @schema_url_key) elsif @schema_json @schema_json end @schema = Avro::Schema.parse(@raw_schema) @reader = Avro::IO::DatumReader.new(@schema) end end
fetch_schema(url, schema_key)
click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 165 def fetch_schema(url, schema_key) uri = URI.parse(url) response = Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http| request = Net::HTTP::Get.new(uri.path) if @api_key and @api_secret request.basic_auth(@api_key, @api_secret) end http.request(request) end if schema_key.nil? response.body else Yajl.load(response.body)[schema_key] end end
parse(data) { |time, record| ... }
click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 103 def parse(data) buffer = StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(buffer) begin if @use_confluent_schema || @avro_registry # When using confluent avro schema, record is formatted as follows: # # MAGIC_BYTE | schema_id | record # ----------:|:---------:|:--------------- # 1byte | 4bytes | record contents magic_byte = decoder.read(1) if magic_byte != MAGIC_BYTE raise "The first byte should be magic byte but got {magic_byte.inspect}" end schema_id = decoder.read(4).unpack("N").first end decoded_data = @reader.read(decoder) time, record = convert_values(parse_time(decoded_data), decoded_data) yield time, record rescue EOFError, RuntimeError => e raise e unless [@schema_url, @avro_registry].compact.size == 1 begin new_raw_schema = if @schema_url fetch_schema(@schema_url, @schema_url_key) elsif @avro_registry @confluent_registry.schema_with_id(schema_id, @avro_registry.schema_key) end new_schema = Avro::Schema.parse(new_raw_schema) is_changed = (new_raw_schema != @raw_schema) @raw_schema = new_raw_schema @schema = new_schema rescue EOFError, RuntimeError # Do nothing. end if is_changed buffer = StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(buffer) if @use_confluent_schema || @avro_registry # When using confluent avro schema, record is formatted as follows: # # MAGIC_BYTE | schema_id | record # ----------:|:---------:|:--------------- # 1byte | 4bytes | record contents magic_byte = decoder.read(1) if magic_byte != MAGIC_BYTE raise "The first byte should be magic byte but got {magic_byte.inspect}" end schema_id = decoder.read(4).unpack("N").first end @reader = Avro::IO::DatumReader.new(@schema) decoded_data = @reader.read(decoder) time, record = convert_values(parse_time(decoded_data), decoded_data) yield time, record else raise e end end end
parser_type()
click to toggle source
# File lib/fluent/plugin/parser_avro.rb, line 99 def parser_type :binary end