class Elasticsearch::Helpers::BulkHelper

Elasticsearch Client Helper for the Bulk API

@see www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html

Attributes

index[RW]

Public Class Methods

new(client, index, params = {}) click to toggle source

Create a BulkHelper

@param [Elasticsearch::Client] client Instance of Elasticsearch client to use. @param [String] index Index on which to perform the Bulk actions. @param [Hash] params Parameters to re-use in every bulk call

# File lib/elasticsearch/helpers/bulk_helper.rb, line 33
def initialize(client, index, params = {})
  @client = client
  @index = index
  @params = params
end

Public Instance Methods

delete(ids, params = {}, body = {}) click to toggle source

Delete documents using the Bulk API

@param [Array] ids Array of id’s for documents to delete. @param [Hash] params Parameters to send to bulk delete.

# File lib/elasticsearch/helpers/bulk_helper.rb, line 64
def delete(ids, params = {}, body = {})
  delete_docs = ids.map { |id| { delete: { _index: @index, _id: id} } }
  @client.bulk({ body: delete_docs }.merge(params.merge(@params)))
end
ingest(docs, params = {}, body = {}, &block) click to toggle source

Index documents using the Bulk API.

@param [Array<Hash>] docs The documents to be indexed. @param [Hash] params Parameters to use in the bulk ingestion. See the official Elastic documentation for Bulk API for parameters to send to the Bulk API. @option params [Integer] slice number of documents to send to the Bulk API for eatch batch of ingestion. @param block [Block] Optional block to run after ingesting a batch of documents. @yieldparam response [Elasticsearch::Transport::Response] The response object from calling the Bulk API. @yieldparam ingest_docs [Array<Hash>] The collection of documents sent in the bulk request.

# File lib/elasticsearch/helpers/bulk_helper.rb, line 48
def ingest(docs, params = {}, body = {}, &block)
  ingest_docs = docs.map { |doc| { index: { _index: @index, data: doc} } }
  if (slice = params.delete(:slice))
    ingest_docs.each_slice(slice) do |items|
      ingest(items.map { |item| item[:index][:data] }, params, &block)
    end
  else
    bulk_request(ingest_docs, params, &block)
  end
end
ingest_json(file, params = {}, &block) click to toggle source

Ingest data directly from a JSON file

@param [String] file (Required) The file path. @param [Hash] params Parameters to use in the bulk ingestion. @option params [Integer] slice number of documents to send to the Bulk API for eatch batch of updates. @option params [Array|String] keys If the data needs to be digged from the JSON file, the

keys can be passed in  with this parameter to find it.

E.g.: If the data in the parsed JSON Hash is found in
+json_parsed['data']['items']+, keys would be passed
like this (as an Array):

+bulk_helper.ingest_json(file, { keys: ['data', 'items'] })+

or as a String:

+bulk_helper.ingest_json(file, { keys: 'data, items' })+

@yieldparam response [Elasticsearch::Transport::Response] The response object from calling the Bulk API. @yieldparam ingest_docs [Array<Hash>] The collection of documents sent in the bulk request.

# File lib/elasticsearch/helpers/bulk_helper.rb, line 110
def ingest_json(file, params = {}, &block)
  data = JSON.parse(File.read(file))
  if (keys = params.delete(:keys))
    keys = keys.split(',') if keys.is_a?(String)
    data = data.dig(*keys)
  end

  ingest(data, params, &block)
end
update(docs, params = {}, body = {}, &block) click to toggle source

Update documents using the Bulk API

@param [Array<Hash>] docs (Required) The documents to be updated. @option params [Integer] slice number of documents to send to the Bulk API for eatch batch of updates. @param block [Block] Optional block to run after ingesting a batch of documents.

@yieldparam response [Elasticsearch::Transport::Response] The response object from calling the Bulk API. @yieldparam ingest_docs [Array<Hash>] The collection of documents sent in the bulk request.

# File lib/elasticsearch/helpers/bulk_helper.rb, line 78
def update(docs, params = {}, body = {}, &block)
  ingest_docs = docs.map do |doc|
    { update: { _index: @index, _id: doc.delete('id'), data: { doc: doc } } }
  end
  if (slice = params.delete(:slice))
    ingest_docs.each_slice(slice) { |items| update(items, params, &block) }
  else
    bulk_request(ingest_docs, params, &block)
  end
end

Private Instance Methods

bulk_request(ingest_docs, params) { |response, ingest_docs| ... } click to toggle source
# File lib/elasticsearch/helpers/bulk_helper.rb, line 122
def bulk_request(ingest_docs, params, &block)
  response = @client.bulk({ body: ingest_docs }.merge(params.merge(@params)))
  yield response, ingest_docs if block_given?
  response
end