class LogStash::Codecs::BulkEs
This codec decodes the incoming meesage into www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format] into individual events, and metadata into the `@metadata` field.
Encoding is not supported.
Public Instance Methods
decode(data) { |event| ... }
click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 30 def decode(data) @lines.decode(data) do |bulk_message| begin line = LogStash::Json.load(bulk_message.get("message")) case @state when :metadata if @metadata["action"] == 'update' if line.has_key?("doc") event = LogStash::Event.new(line["doc"]) if line.has_key?("doc_as_upsert") @metadata["doc_as_upsert"] = line["doc_as_upsert"] end else event = LogStash::Event.new(line) end if line.has_key?("upsert") @metadata["upsert"] = LogStash::Json.dump(line["upsert"]) end elsif #for action = index or create event = LogStash::Event.new(line) end event.set("@metadata", @metadata) yield event @state = :init when :init @metadata = line[line.keys[0]] @metadata["action"] = line.keys[0].to_s @state = :metadata if @metadata["action"] == 'delete' event = LogStash::Event.new() event.set("@metadata", @metadata) yield event @state = :init end end rescue LogStash::Json::ParserError => e log_failure( "messages must in be UTF-8 JSON format", :error => e, :data => data ) end end end
encode(data)
click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 78 def encode(data) raise "Not implemented" end
register()
click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 23 def register @lines = LogStash::Codecs::Line.new @lines.charset = "UTF-8" @state = :init @metadata = Hash.new end
Private Instance Methods
log_failure(message, opts)
click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 18 def log_failure(message, opts) @logger.error("[BulkES Json Parse Failure] #{message}", opts) end