class SearchFlip::Bulk
@api private
The SearchFlip::Bulk
class implements the bulk support, ie it collects single requests and emits batches of requests.
@example
SearchFlip::Bulk.new "http://127.0.0.1:9200/index/type/_bulk" do |bulk| bulk.create record.id, MyIndex.serialize(record) bulk.index record.id, MyIndex.serialize(record), version: record.version, version_type: "external" bulk.delete record.id, routing: record.user_id bulk.update record.id, doc: MyIndex.serialize(record) end
Attributes
Public Class Methods
@api private
Builds and yields a new Bulk
object, ie initiates the buffer, yields, sends batches of records each time the buffer is full, and sends a final batch after the yielded code returns and there are still documents present within the buffer.
@example Basic use
SearchFlip::Bulk.new "http://127.0.0.1:9200/index/type/_bulk" do |bulk| # ... end
@example Ignore certain errors
SearchFlip::Bulk.new "http://127.0.0.1:9200/index/type/_bulk", 1_000, ignore_errors: [409] do |bulk| # ... end
@param url [String] The endpoint to send bulk requests to @param options [Hash] Options for the bulk requests @option options bulk_limit [Fixnum] The maximum number of documents per bulk
request
@option bulk_max_mb [Fixnum] The maximum payload size in megabytes per
bulk request
@option options ignore_errors
[Array, Fixnum] Errors that should be
ignored. If you eg want to ignore errors resulting from conflicts, you can specify to ignore 409 here.
@option options raise [Boolean] If you want the bulk requests to never
raise any exceptions (fire and forget), you can pass false here. Default is true.
@option options http_client [SearchFlip::HTTPClient] An optional http
client instance
# File lib/search_flip/bulk.rb, line 52 def initialize(url, options = {}) @url = url @options = options @http_client = options[:http_client] || SearchFlip::HTTPClient.new @ignore_errors = Array(options[:ignore_errors] || []).to_set @bulk_limit = options[:bulk_limit] || SearchFlip::Config[:bulk_limit] @bulk_max_mb = options[:bulk_max_mb] || SearchFlip::Config[:bulk_max_mb] @bulk_max_bytes = @bulk_max_mb * 1024 * 1024 init yield self upload if @num > 0 end
Public Instance Methods
@api private
Adds a create request to the bulk batch.
@param id [Fixnum, String] The document/record id @param json [String] The json document @param options [options] Options for the index request, like eg routing
and versioning
# File lib/search_flip/bulk.rb, line 102 def create(id, object, options = {}) perform(:create, id, SearchFlip::JSON.generate(object), options) end
@api private
Adds a delete request to the bulk batch.
@param id [Fixnum, String] The document/record id @param options [options] Options for the index request, like eg routing
and versioning
# File lib/search_flip/bulk.rb, line 127 def delete(id, options = {}) perform(:delete, id, nil, options) end
# File lib/search_flip/bulk.rb, line 89 def import(*args) index(*args) end
@api private
Adds an index request to the bulk batch.
@param id [Fixnum, String] The document/record id @param json [String] The json document @param options [options] Options for the index request, like eg routing
and versioning
# File lib/search_flip/bulk.rb, line 79 def index(id, object, options = {}) perform(:index, id, SearchFlip::JSON.generate(object), options) end
@api private
Adds a update request to the bulk batch.
@param id [Fixnum, String] The document/record id @param json [String] The json document @param options [options] Options for the index request, like eg routing
and versioning
# File lib/search_flip/bulk.rb, line 115 def update(id, object, options = {}) perform(:update, id, SearchFlip::JSON.generate(object), options) end
Private Instance Methods
# File lib/search_flip/bulk.rb, line 133 def init @payload = "" @num = 0 end
# File lib/search_flip/bulk.rb, line 164 def perform(action, id, json = nil, options = {}) new_payload = SearchFlip::JSON.generate(action => options.merge(_id: id)) new_payload << "\n" if json new_payload << json new_payload << "\n" end upload if @num > 0 && @payload.bytesize + new_payload.bytesize >= @bulk_max_bytes @payload << new_payload @num += 1 upload if @num >= @bulk_limit || @payload.bytesize >= @bulk_max_bytes end
# File lib/search_flip/bulk.rb, line 138 def upload response = @http_client .headers(accept: "application/json", content_type: "application/x-ndjson") .put(url, body: @payload) return if options[:raise] == false parsed_response = SearchFlip::JSON.parse(response.to_s) return unless parsed_response["errors"] parsed_response["items"].each do |item| item.each do |_, element| status = element["status"] next if status.between?(200, 299) next if ignore_errors.include?(status) raise SearchFlip::Bulk::Error, SearchFlip::JSON.generate(element) end end ensure init end