class LogStash::Codecs::Protobuf
This codec converts protobuf encoded messages into logstash events and vice versa.
Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).
The following shows a usage example for decoding protobuf 2 encoded events from a kafka stream:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal::Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] }
}
Same example for protobuf 3:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal.Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf_pb.rb'] protobuf_version => 3 }
}
Specifically for the kafka input: please set the deserializer classes as shown above.
Attributes
Public Instance Methods
# File lib/logstash/codecs/protobuf.rb, line 212 def decode(data) if @protobuf_version == 3 decoded = @pb_builder.decode(data.to_s) if @pb3_set_oneof_metainfo meta = pb3_get_oneof_metainfo(decoded, @class_name) end h = pb3_deep_to_hash(decoded) else decoded = @pb_builder.parse(data.to_s) h = decoded.to_hash end e = LogStash::Event.new(h) if @protobuf_version == 3 and @pb3_set_oneof_metainfo e.set("[@metadata][pb_oneof]", meta) end yield e if block_given? rescue => ex @logger.warn("Couldn't decode protobuf: #{ex.inspect}.") if stop_on_error raise ex else # keep original message so that the user can debug it. yield LogStash::Event.new("message" => data, "tags" => ["_protobufdecodefailure"]) end end
# File lib/logstash/codecs/protobuf.rb, line 238 def encode(event) if @protobuf_version == 3 protobytes = pb3_encode(event) else protobytes = pb2_encode(event) end unless protobytes.nil? or protobytes.empty? @on_event.call(event, protobytes) end end
id of the pipeline whose events you want to read from.
# File lib/logstash/codecs/protobuf.rb, line 158 def pipeline_id respond_to?(:execution_context) && !execution_context.nil? ? execution_context.pipeline_id : "main" end
# File lib/logstash/codecs/protobuf.rb, line 162 def register @metainfo_messageclasses = {} @metainfo_enumclasses = {} @metainfo_pb2_enumlist = [] @pb3_typeconversion_tag = "_protobuf_type_converted" if @include_path.length > 0 and not class_file.strip.empty? raise LogStash::ConfigurationError, "Cannot use `include_path` and `class_file` at the same time" end if @include_path.length == 0 and class_file.strip.empty? raise LogStash::ConfigurationError, "Need to specify `include_path` or `class_file`" end should_register = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil? unless @protobuf_root_directory.nil? or @protobuf_root_directory.strip.empty? if !$LOAD_PATH.include? @protobuf_root_directory and should_register $LOAD_PATH.unshift(@protobuf_root_directory) end end @class_file = "#{@protobuf_root_directory}/#{@class_file}" unless (Pathname.new @class_file).absolute? or @class_file.empty? # exclusive access while loading protobuf definitions Google::Protobuf::DescriptorPool.with_lock.synchronize do # load from `class_file` load_protobuf_definition(@class_file) if should_register and !@class_file.empty? # load from `include_path` include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register if @protobuf_version == 3 @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass else @pb_builder = pb2_create_instance(class_name) end end end
Pipelines using this plugin cannot be reloaded. github.com/elastic/logstash/pull/6499
The DescriptorPool instance registers the protobuf classes (and dependencies) as global objects. This makes it very difficult to reload a pipeline, because `class_name` and all of its dependencies are already registered.
# File lib/logstash/codecs/protobuf.rb, line 208 def reloadable? return false end
Private Instance Methods
# File lib/logstash/codecs/protobuf.rb, line 712 def load_protobuf_definition(filename) if filename.end_with? ('.rb') # Add to the loading path of the protobuf definitions if (Pathname.new filename).absolute? begin require filename rescue Exception => e @logger.error("Unable to load file: #{filename}. Reason: #{e.inspect}") raise e end end if @protobuf_version == 3 pb3_metadata_analyis(filename) else pb2_metadata_analyis(filename) end else @logger.warn("Not a ruby file: " + filename) end end
# File lib/logstash/codecs/protobuf.rb, line 613 def pb2_create_instance(name) @logger.debug("Creating instance of " + name) name.split('::').inject(Object) { |n,c| n.const_get c } end
# File lib/logstash/codecs/protobuf.rb, line 563 def pb2_encode(event) data = pb2_prepare_for_encoding(event.to_hash, @class_name) msg = @pb_builder.new(data) msg.serialize_to_string rescue NoMethodError => e @logger.warn("Encoding error 2. Probably mismatching protobuf definition. Required fields in the protobuf definition are: " + event.to_hash.keys.join(", ") + " and the timestamp field name must not include a @. ") raise e rescue => e @logger.warn("Encoding error 1: #{e.inspect}") raise e end
# File lib/logstash/codecs/protobuf.rb, line 654 def pb2_metadata_analyis(filename) regex_class_start = /\s*set_fully_qualified_name \"(?<name>.+)\".*?/ regex_enum_name = /\s*include ..ProtocolBuffers..Enum\s*/ regex_pbdefs = /\s*(optional|repeated)(\s*):(?<type>.+),(\s*):(?<name>\w+),(\s*)(?<position>\d+)/ # now we also need to find out which class it contains and the protobuf definitions in it. # We'll unfortunately need that later so that we can create nested objects. class_name = "" type = "" field_name = "" is_enum_class = false File.readlines(filename).each do |line| if ! (line =~ regex_enum_name).nil? is_enum_class= true end if ! (line =~ regex_class_start).nil? class_name = $1.gsub('.',"::").split('::').map {|word| word.capitalize}.join('::') if is_enum_class @metainfo_pb2_enumlist << class_name.downcase end is_enum_class= false # reset when next class starts end if ! (line =~ regex_pbdefs).nil? type = $1 field_name = $2 if type =~ /::/ clean_type = type.gsub(/^:/,"") e = @metainfo_pb2_enumlist.include? clean_type.downcase if e if not @metainfo_enumclasses.key? class_name @metainfo_enumclasses[class_name] = {} end @metainfo_enumclasses[class_name][field_name] = clean_type else if not @metainfo_messageclasses.key? class_name @metainfo_messageclasses[class_name] = {} end @metainfo_messageclasses[class_name][field_name] = clean_type end end end end if class_name.nil? @logger.warn("Error 4: class name not found in file " + filename) raise ArgumentError, "Invalid protobuf file: " + filename end rescue LoadError => e raise ArgumentError.new("Could not load file: " + filename + ". Please try to use absolute pathes. Current working dir: " + Dir.pwd + ", loadpath: " + $LOAD_PATH.join(" ")) rescue => e @logger.warn("Error 3: unable to read pb definition from file " + filename+ ". Reason: #{e.inspect}. Last settings were: class #{class_name} field #{field_name} type #{type}. Backtrace: " + e.backtrace.inspect.to_s) raise e end
# File lib/logstash/codecs/protobuf.rb, line 576 def pb2_prepare_for_encoding(datahash, class_name) if datahash.is_a?(::Hash) # Preparation: the data cannot be encoded until certain criteria are met: # 1) remove @ signs from keys. # 2) convert timestamps and other objects to strings datahash = ::Hash[datahash.map{|(k,v)| [k.to_s.dup.gsub(/@/,''), (should_convert_to_string?(v) ? v.to_s : v)] }] # Check if any of the fields in this hash are protobuf classes and if so, create a builder for them. meta = @metainfo_messageclasses[class_name] if meta meta.map do | (k,c) | if datahash.include?(k) original_value = datahash[k] datahash[k] = if original_value.is_a?(::Array) # make this field an array/list of protobuf objects # value is a list of hashed complex objects, each of which needs to be protobuffed and # put back into the list. original_value.map { |x| pb2_prepare_for_encoding(x, c) } original_value else proto_obj = pb2_create_instance(c) proto_obj.new(pb2_prepare_for_encoding(original_value, c)) # this line is reached in the colourtest for an enum. Enums should not be instantiated. Should enums even be in the messageclasses? I dont think so! TODO bug end # if is array end # if datahash_include end # do end # if meta end datahash end
# File lib/logstash/codecs/protobuf.rb, line 467 def pb3_add_tag(event, tag ) if event.get('tags').nil? event.set('tags', [tag]) else existing_tags = event.get('tags') event.set("tags", existing_tags << tag) end end
# File lib/logstash/codecs/protobuf.rb, line 376 def pb3_compare_datatypes(value, key, key_prefix, pb_class, expected_type) mismatches = [] if value.nil? is_mismatch = false else case value when ::Hash, Google::Protobuf::MessageExts is_mismatch = false descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class).lookup(key) if !descriptor.subtype.nil? class_of_nested_object = pb3_get_descriptorpool_name(descriptor.subtype.msgclass) new_prefix = "#{key}." recursive_mismatches = pb3_get_type_mismatches(value, new_prefix, class_of_nested_object) mismatches.concat(recursive_mismatches) end when ::Array expected_type = pb3_get_expected_type(key, pb_class) is_mismatch = (expected_type != Google::Protobuf::RepeatedField) child_type = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class).lookup(key).type value.each_with_index do | v, i | new_prefix = "#{key}." recursive_mismatches = pb3_compare_datatypes(v, i.to_s, new_prefix, pb_class, child_type) mismatches.concat(recursive_mismatches) is_mismatch |= recursive_mismatches.any? end # do else # is scalar data type is_mismatch = ! pb3_is_scalar_datatype_match(expected_type, value.class) end # if end # if value.nil? if is_mismatch mismatches << {"key" => "#{key_prefix}#{key}", "actual_type" => value.class, "expected_type" => expected_type, "value" => value} end mismatches end
Due to recursion on nested fields in the event object this method might be given an event (1st call) or a hash (2nd .. nth call) First call will be the event object, child objects will be hashes.
# File lib/logstash/codecs/protobuf.rb, line 478 def pb3_convert_mismatched_types(struct, mismatches) mismatches.each do | m | key = m['key'] expected_type = m['expected_type'] actual_type = m['actual_type'] if key.include? "." # the mismatch is in a child object levels = key.split(/\./) # key is something like http_user_agent.minor_version and needs to be splitted. key = levels[0] sub_levels = levels.drop(1).join(".") new_mismatches = [{"key"=>sub_levels, "actual_type"=>m["actual_type"], "expected_type"=>m["expected_type"]}] value = pb3_convert_mismatched_types_getter(struct, key) new_value = pb3_convert_mismatched_types(value, new_mismatches) struct = pb3_convert_mismatched_types_setter(struct, key, new_value ) else value = pb3_convert_mismatched_types_getter(struct, key) begin case expected_type.to_s when "Integer" case actual_type.to_s when "String" new_value = value.to_i when "Float" if value.floor == value # convert values like 2.0 to 2, but not 2.1 new_value = value.to_i end end when "String" new_value = value.to_s when "Float" new_value = value.to_f when "Boolean","TrueClass", "FalseClass" new_value = value.to_s.downcase == "true" end if !new_value.nil? struct = pb3_convert_mismatched_types_setter(struct, key, new_value ) end rescue Exception => ex @logger.debug("Protobuf encoding error 5: Could not convert types for protobuf encoding: #{ex}") end end # if key contains . end # mismatches.each struct end
# File lib/logstash/codecs/protobuf.rb, line 450 def pb3_convert_mismatched_types_getter(struct, key) if struct.is_a? ::Hash struct[key] else struct.get(key) end end
# File lib/logstash/codecs/protobuf.rb, line 458 def pb3_convert_mismatched_types_setter(struct, key, value) if struct.is_a? ::Hash struct[key] = value else struct.set(key, value) end struct end
# File lib/logstash/codecs/protobuf.rb, line 251 def pb3_deep_to_hash(input) case input when Google::Protobuf::MessageExts # it's a protobuf class result = Hash.new input.to_hash.each {|key, value| result[key] = pb3_deep_to_hash(value) # the key is required for the class lookup of enums. } when ::Array result = [] input.each {|value| result << pb3_deep_to_hash(value) } when ::Hash result = {} input.each {|key, value| result[key] = pb3_deep_to_hash(value) } when Symbol # is an Enum result = input.to_s.sub(':','') else result = input end result end
# File lib/logstash/codecs/protobuf.rb, line 276 def pb3_encode(event) datahash = event.to_hash is_recursive_call = !event.get('tags').nil? and event.get('tags').include? @pb3_typeconversion_tag if is_recursive_call datahash = pb3_remove_typeconversion_tag(datahash) end datahash = pb3_prepare_for_encoding(datahash) if datahash.nil? @logger.warn("Protobuf encoding error 4: empty data for event #{event.to_hash}") end if @pb_builder.nil? @logger.warn("Protobuf encoding error 5: empty protobuf builder for class #{@class_name}") end pb_obj = @pb_builder.new(datahash) @pb_builder.encode(pb_obj) rescue ArgumentError => e k = event.to_hash.keys.join(", ") @logger.warn("Protobuf encoding error 1: Argument error (#{e.inspect}). Reason: probably mismatching protobuf definition. \ Required fields in the protobuf definition are: #{k} and fields must not begin with @ sign. The event has been discarded.") nil rescue TypeError => e pb3_handle_type_errors(event, e, is_recursive_call, datahash) nil rescue => e @logger.warn("Protobuf encoding error 3: #{e.inspect}. Event discarded. Input data: #{datahash}. The event has been discarded. Backtrace: #{e.backtrace}") nil end
# File lib/logstash/codecs/protobuf.rb, line 424 def pb3_get_descriptorpool_name(child_class) # make instance inst = child_class.new # get the lookup name for the Descriptorpool inst.class.descriptor.name end
# File lib/logstash/codecs/protobuf.rb, line 360 def pb3_get_expected_type(key, pb_class) pb_descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class) if !pb_descriptor.nil? pb_builder = pb_descriptor.msgclass pb_obj = pb_builder.new({}) v = pb_obj.send(key) if !v.nil? v.class else nil end end end
# File lib/logstash/codecs/protobuf.rb, line 538 def pb3_get_oneof_metainfo(pb_object, pb_class_name) meta = {} pb_class = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class_name).msgclass pb_class.descriptor.each_oneof { |field| field.each { | group_option | if !pb_object.send(group_option.name).nil? meta[field.name] = group_option.name end } } pb_class.descriptor.select{ |field| field.type == :message }.each { | field | # recurse over nested protobuf classes pb_sub_object = pb_object.send(field.name) if !pb_sub_object.nil? and !field.subtype.nil? pb_sub_class = pb3_get_descriptorpool_name(field.subtype.msgclass) meta[field.name] = pb3_get_oneof_metainfo(pb_sub_object, pb_sub_class) end } meta end
# File lib/logstash/codecs/protobuf.rb, line 349 def pb3_get_type_mismatches(data, key_prefix, pb_class) mismatches = [] data.to_hash.each do |key, value| expected_type = pb3_get_expected_type(key, pb_class) r = pb3_compare_datatypes(value, key, key_prefix, pb_class, expected_type) mismatches.concat(r) end # data.each mismatches end
# File lib/logstash/codecs/protobuf.rb, line 310 def pb3_handle_type_errors(event, e, is_recursive_call, datahash) begin if is_recursive_call @logger.warn("Protobuf encoding error 2.1: Type error (#{e.inspect}). Some types could not be converted. The event has been discarded. Type mismatches: #{mismatches}.") else if @pb3_encoder_autoconvert_types msg = "Protobuf encoding error 2.2: Type error (#{e.inspect}). Will try to convert the data types. Original data: #{datahash}" @logger.warn(msg) mismatches = pb3_get_type_mismatches(datahash, "", @class_name) event = pb3_convert_mismatched_types(event, mismatches) # Add a (temporary) tag to handle the recursion stop pb3_add_tag(event, @pb3_typeconversion_tag ) pb3_encode(event) else @logger.warn("Protobuf encoding error 2.3: Type error (#{e.inspect}). The event has been discarded. Try setting pb3_encoder_autoconvert_types => true for automatic type conversion.") end end rescue TypeError => e if @pb3_encoder_autoconvert_types @logger.warn("Protobuf encoding error 2.4.1: (#{e.inspect}). Failed to convert data types. The event has been discarded. original data: #{datahash}") else @logger.warn("Protobuf encoding error 2.4.2: (#{e.inspect}). The event has been discarded.") end if @stop_on_error raise e end nil rescue => ex @logger.warn("Protobuf encoding error 2.5: (#{e.inspect}). The event has been discarded. Auto-typecasting was on: #{@pb3_encoder_autoconvert_types}") if @stop_on_error raise ex end nil end end
# File lib/logstash/codecs/protobuf.rb, line 431 def pb3_is_scalar_datatype_match(expected_type, actual_type) if expected_type == actual_type true else e = expected_type.to_s.downcase.to_sym a = actual_type.to_s.downcase.to_sym case e # when :string, :integer when :string a == e when :integer a == e when :float a == :float || a == :integer end end end
# File lib/logstash/codecs/protobuf.rb, line 619 def pb3_metadata_analyis(filename) regex_class_name = /\s*add_message "(?<name>.+?)" do\s+/ # TODO optimize both regexes for speed (negative lookahead) regex_pbdefs = /\s*(optional|repeated)(\s*):(?<name>.+),(\s*):(?<type>\w+),(\s*)(?<position>\d+)(, \"(?<enum_class>.*?)\")?/ class_name = "" type = "" field_name = "" File.readlines(filename).each do |line| if ! (line =~ regex_class_name).nil? class_name = $1 @metainfo_messageclasses[class_name] = {} @metainfo_enumclasses[class_name] = {} end # if if ! (line =~ regex_pbdefs).nil? field_name = $1 type = $2 field_class_name = $4 if type == "message" @metainfo_messageclasses[class_name][field_name] = field_class_name elsif type == "enum" @metainfo_enumclasses[class_name][field_name] = field_class_name end end # if end # readlines if class_name.nil? @logger.warn("Error 4: class name not found in file " + filename) raise ArgumentError, "Invalid protobuf file: " + filename end rescue Exception => e @logger.warn("Error 3: unable to read pb definition from file " + filename+ ". Reason: #{e.inspect}. Last settings were: class #{class_name} field #{field_name} type #{type}. Backtrace: " + e.backtrace.inspect.to_s) raise e end
# File lib/logstash/codecs/protobuf.rb, line 522 def pb3_prepare_for_encoding(datahash) # 0) Remove empty fields. datahash = datahash.select { |key, value| !value.nil? } # Preparation: the data cannot be encoded until certain criteria are met: # 1) remove @ signs from keys. # 2) convert timestamps and other objects to strings datahash = datahash.inject({}){|x,(k,v)| x[k.gsub(/@/,'').to_sym] = (should_convert_to_string?(v) ? v.to_s : v); x} datahash.each do |key, value| datahash[key] = pb3_prepare_for_encoding(value) if value.is_a?(Hash) end datahash end
# File lib/logstash/codecs/protobuf.rb, line 414 def pb3_remove_typeconversion_tag(data) # remove the tag that we added to the event because # the protobuf definition might not have a field for tags data['tags'].delete(@pb3_typeconversion_tag) if data['tags'].length == 0 data.delete('tags') end data end
# File lib/logstash/codecs/protobuf.rb, line 608 def should_convert_to_string?(v) !(v.is_a?(Integer) || v.is_a?(Float) || v.is_a?(::Hash) || v.is_a?(::Array) || [true, false].include?(v)) end