class LogStash::Codecs::BulkEs

This codec decodes the incoming meesage into www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format] into individual events, and metadata into the `@metadata` field.

Encoding is not supported.

Public Instance Methods

decode(data) { |event| ... } click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 30
def decode(data)
  @lines.decode(data) do |bulk_message|
    begin
      line = LogStash::Json.load(bulk_message.get("message"))
      case @state
      when :metadata
        if @metadata["action"] == 'update'
          if line.has_key?("doc")
            event = LogStash::Event.new(line["doc"])
            if line.has_key?("doc_as_upsert")
              @metadata["doc_as_upsert"] = line["doc_as_upsert"]
            end
          else
            event = LogStash::Event.new(line)
          end

          if line.has_key?("upsert")
            @metadata["upsert"] = LogStash::Json.dump(line["upsert"])
          end
        elsif #for action = index or create
          event = LogStash::Event.new(line)
        end

        event.set("@metadata", @metadata)
        yield event
        @state = :init
      when :init
        @metadata = line[line.keys[0]]
        @metadata["action"] = line.keys[0].to_s
        @state = :metadata
        if @metadata["action"] == 'delete'
          event = LogStash::Event.new()
          event.set("@metadata", @metadata)
          yield event
          @state = :init
        end
      end

    rescue LogStash::Json::ParserError => e
      log_failure(
        "messages must in be UTF-8 JSON format",
        :error => e,
        :data => data
      )
    end
  end
end
encode(data) click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 78
def encode(data)
  raise "Not implemented"
end
register() click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 23
def register
  @lines = LogStash::Codecs::Line.new
  @lines.charset = "UTF-8"
  @state = :init
  @metadata = Hash.new
end

Private Instance Methods

log_failure(message, opts) click to toggle source
# File lib/logstash/codecs/bulk_es.rb, line 18
def log_failure(message, opts)
  @logger.error("[BulkES Json Parse Failure] #{message}", opts)
end