class LogStash::Outputs::ElasticWorkplaceSearch

Public Instance Methods

multi_receive(events) click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 53
def multi_receive(events)
  # because Workplace Search has a limit of 100 documents per bulk
  events.each_slice(100) do |events|
    batch = format_batch(events)
    if @logger.trace?
      @logger.trace("Sending bulk to Workplace Search", :size => batch.size, :data => batch.inspect)
    end
    index(batch)
  end
end
register() click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 40
def register
  Elastic::WorkplaceSearch.endpoint = "#{@url.chomp('/')}/api/ws/v1/"
  @client = Elastic::WorkplaceSearch::Client.new(:access_token => @access_token.value)
  check_connection!
rescue Elastic::WorkplaceSearch::InvalidCredentials
  raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. Error: 401. Please check your credentials")
rescue Elastic::WorkplaceSearch::NonExistentRecord
  raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. Error: 404. Please check if url '#{@url}' is correct and you've created a source with ID '#{@source}'")
rescue => e
  raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. #{e.message}")
end

Private Instance Methods

check_connection!() click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 103
def check_connection!
  @client.list_all_permissions(@source)
end
format_batch(events) click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 65
def format_batch(events)
  events.map do |event|
    doc = event.to_hash
    # we need to remove default fields that start with "@"
    # since Elastic Workplace Search doesn't accept them
    if @timestamp_destination
      doc[@timestamp_destination] = doc.delete("@timestamp")
    else # delete it
      doc.delete("@timestamp")
    end
    if @document_id
      doc["id"] = event.sprintf(@document_id)
    end
    doc.delete("@version")
    doc
  end
end
index(documents) click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 83
def index(documents)
  response = @client.index_documents(@source, documents)
  report(documents, response)
rescue => e
  @logger.error("Failed to execute index operation. Retrying..", :exception => e.class, :reason => e.message)
  sleep(1)
  retry
end
report(documents, response) click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 92
def report(documents, response)
  documents.each_with_index do |document, i|
    errors = response["results"][i]["errors"]
    if errors.empty?
      @logger.trace? && @logger.trace("Document was indexed with no errors", :document => document)
    else
      @logger.warn("Document failed to index. Dropping..", :document => document, :errors => errors.to_a)
    end
  end
end