class LogStash::Outputs::ElasticWorkplaceSearch
Public Instance Methods
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 53 def multi_receive(events) # because Workplace Search has a limit of 100 documents per bulk events.each_slice(100) do |events| batch = format_batch(events) if @logger.trace? @logger.trace("Sending bulk to Workplace Search", :size => batch.size, :data => batch.inspect) end index(batch) end end
register()
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 40 def register Elastic::WorkplaceSearch.endpoint = "#{@url.chomp('/')}/api/ws/v1/" @client = Elastic::WorkplaceSearch::Client.new(:access_token => @access_token.value) check_connection! rescue Elastic::WorkplaceSearch::InvalidCredentials raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. Error: 401. Please check your credentials") rescue Elastic::WorkplaceSearch::NonExistentRecord raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. Error: 404. Please check if url '#{@url}' is correct and you've created a source with ID '#{@source}'") rescue => e raise ::LogStash::ConfigurationError.new("Failed to connect to Workplace Search. #{e.message}") end
Private Instance Methods
check_connection!()
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 103 def check_connection! @client.list_all_permissions(@source) end
format_batch(events)
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 65 def format_batch(events) events.map do |event| doc = event.to_hash # we need to remove default fields that start with "@" # since Elastic Workplace Search doesn't accept them if @timestamp_destination doc[@timestamp_destination] = doc.delete("@timestamp") else # delete it doc.delete("@timestamp") end if @document_id doc["id"] = event.sprintf(@document_id) end doc.delete("@version") doc end end
index(documents)
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 83 def index(documents) response = @client.index_documents(@source, documents) report(documents, response) rescue => e @logger.error("Failed to execute index operation. Retrying..", :exception => e.class, :reason => e.message) sleep(1) retry end
report(documents, response)
click to toggle source
# File lib/logstash/outputs/elastic_workplace_search.rb, line 92 def report(documents, response) documents.each_with_index do |document, i| errors = response["results"][i]["errors"] if errors.empty? @logger.trace? && @logger.trace("Document was indexed with no errors", :document => document) else @logger.warn("Document failed to index. Dropping..", :document => document, :errors => errors.to_a) end end end