class LogStash::Codecs::AvroSchemaRegistry

Logstash Codec - Avro Schema Registry

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (github.com/confluentinc/schema-registry)

Decoding (input)

When this codec is used to decode the input, you may pass the following options:

If the input stream is binary encoded, you should use the “ByteArrayDeserializer“ in the Kafka input config.

Encoding (output)

This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.

When this codec is used to encode, you may pass the following options:

Usage

Example usage with Kafka input and output.

source,ruby

input {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
  }
  value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}

} filter {

...

} output {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
    subject_name => "my_kafka_subject_name"
    schema_uri => "/app/my_kafka_subject.avsc"
    register_schema => true
  }
  value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}

}


Using signed certificate for registry authentication

source,ruby

output {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
    schema_id => 47
    client_key          => "./client.key"
    client_certificate  => "./client.crt"
    ca_certificate      => "./ca.pem"
    verify_mode         => "verify_peer"
  }
  value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}

}


Constants

EXCLUDE_ALWAYS

Public Instance Methods

clean_event(event) click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 214
def clean_event(event)
  event_hash = event.to_hash
  event_hash.delete_if { |key, _| EXCLUDE_ALWAYS.include? key }
  event_hash
end
decode(data) { |event(read)| ... } click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 221
def decode(data)
  if data.length < 5
    @logger.error('message is too small to decode')
  else
    datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
    magic_byte, schema_id = datum.read(5).unpack("cI>")
    if magic_byte != MAGIC_BYTE
      @logger.error('message does not start with magic byte')
    else
      schema = get_schema(schema_id)
      decoder = Avro::IO::BinaryDecoder.new(datum)
      datum_reader = Avro::IO::DatumReader.new(schema)
      yield LogStash::Event.new(datum_reader.read(decoder))
    end
  end
rescue => e
  if tag_on_failure
    @logger.error("Avro parse error, original data now in message field", :error => e)
    yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"])
  else
    raise e
  end
end
encode(event) click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 246
def encode(event)
  @write_schema_id ||= get_write_schema_id
  schema = get_schema(@write_schema_id)
  dw = Avro::IO::DatumWriter.new(schema)
  buffer = StringIO.new
  buffer.write(MAGIC_BYTE.chr)
  buffer.write([@write_schema_id].pack("I>"))
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  dw.write(clean_event(event), encoder)
  if @binary_encoded
     @on_event.call(event, buffer.string.to_java_bytes)
  elsif @base64_encoded
     @on_event.call(event, Base64.strict_encode64(buffer.string))
  else
     @on_event.call(event, buffer.string)
  end
end
get_schema(schema_id) click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 158
def get_schema(schema_id)
  unless @schemas.has_key?(schema_id)
    @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id))
  end
  @schemas[schema_id]
end
get_write_schema_id() click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 175
def get_write_schema_id()
  # If schema id is passed, just use that
  if @schema_id
    @schema_id

  else
    # subject_name is required
    if @subject_name == nil
      @logger.error('requires a subject_name')
    else
      subject = @client.subject(@subject_name)

      # If schema_version, load from subject API
      if @schema_version != nil
        schema = subject.version(@schema_version)

      # Otherwise, load schema json and check with registry
      else
        schema_json = load_schema_json

        # If not compatible, raise error
        if @check_compatibility
          unless subject.compatible?(schema_json)
            @logger.error('the schema json is not compatible with the subject. you should fix your schema or change the compatibility level.')
          end
        end

        if @register_schema
          subject.register_schema(schema_json) unless subject.schema_registered?(schema_json)
        end

        schema = subject.verify_schema(schema_json)
      end
      # Return schema id
      schema.id
    end
  end
end
load_schema_json() click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 165
def load_schema_json()
  if @schema_uri
    open(@schema_uri).read
  elsif @schema_string
    @schema_string
  else
    @logger.error('you must supply a schema_uri or schema_string in the config')
  end
end
register() click to toggle source
# File lib/logstash/codecs/avro_schema_registry.rb, line 142
def register
  @client = if client_certificate != nil
    SchemaRegistry::Client.new(endpoint, username, password, SchemaRegistry::Client.connection_options(
      client_certificate: client_certificate,
      client_key: client_key,
      ca_certificate: ca_certificate,
      verify_mode: verify_mode
    ))
  else
    SchemaRegistry::Client.new(endpoint, username, password)
  end

  @schemas = Hash.new
  @write_schema_id = nil
end