module Sequel::Plugins::Elasticsearch::ClassMethods

The class methods that will be added to the Sequel::Model

Attributes

elasticsearch_index[RW]

The Elasticsearch index to which the documents will be written.

elasticsearch_opts[RW]

The extra options that will be passed to the Elasticsearch client.

elasticsearch_type[RW]

The Elasticsearch type to which the documents will be written.

Public Instance Methods

alias_index(new_index) click to toggle source

Remove previous aliases and point the `elasticsearch_index` to the new index.

# File lib/sequel/plugins/elasticsearch.rb, line 129
def alias_index(new_index)
  es_client.indices.update_aliases body: {
    actions: [
      { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } },
      { add: { index: new_index, alias: elasticsearch_index } }
    ]
  }
end
call_es() { || ... } click to toggle source

Wrapper method in which error handling is done for Elasticsearch calls.

# File lib/sequel/plugins/elasticsearch.rb, line 71
def call_es
  yield
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound,
       ::Elasticsearch::Transport::Transport::Error,
       Faraday::ConnectionFailed => e
  db.loggers.first.warn e if db.loggers.count.positive?
  nil
end
es(query = '', opts = {}) click to toggle source

Execute a search or a scroll on the Model's Elasticsearch index. This method is “safe” in that it will catch the more common Errors.

# File lib/sequel/plugins/elasticsearch.rb, line 66
def es(query = '', opts = {})
  call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) }
end
es!(query = '', opts = {}) click to toggle source

Execute a search on the Model's Elasticsearch index without catching Errors.

# File lib/sequel/plugins/elasticsearch.rb, line 47
def es!(query = '', opts = {})
  opts = {
    index: elasticsearch_index,
    type: elasticsearch_type
  }.merge(opts)
  query.is_a?(String) ? opts[:q] = query : opts[:body] = query
  Result.new es_client.search(opts), self
end
es_client() click to toggle source

Return the Elasticsearch client used to communicate with the cluster.

# File lib/sequel/plugins/elasticsearch.rb, line 42
def es_client
  @es_client = ::Elasticsearch::Client.new elasticsearch_opts
end
import!(index: nil, dataset: nil, batch_size: 100) click to toggle source

Import the whole dataset into Elasticsearch.

This assumes that a template that covers all the possible index names have been created. See timestamped_index for examples of the indices that will be created.

This adds or updates records to the last index created by this utility. Use the reindex! method to create a completely new index and alias.

# File lib/sequel/plugins/elasticsearch.rb, line 88
def import!(index: nil, dataset: nil, batch_size: 100)
  dataset ||= self.dataset
  index_name = index || last_index || elasticsearch_index

  # Index all the documents
  body = []
  dataset.each_page(batch_size) do |ds|
    body = []
    ds.all.each do |row|
      print '.'
      body << { update: import_object(index_name, row) }
    end
    puts '/'
    es_client.bulk body: body
    body = nil
  end
end
import_object(idx, row) click to toggle source
# File lib/sequel/plugins/elasticsearch.rb, line 106
def import_object(idx, row)
  val = {
    _index: idx,
    _id: row.document_id,
    data: { doc: row.as_indexed_json, doc_as_upsert: true }
  }
  val[:_type] = elasticsearch_type if elasticsearch_type
  val
end
last_index() click to toggle source

Find the last created index that matches the specified index name.

# File lib/sequel/plugins/elasticsearch.rb, line 139
def last_index
  es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end
reindex!(index: nil, dataset: nil, batch_size: 100) click to toggle source

Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.

See the documentation on import! for more details.

# File lib/sequel/plugins/elasticsearch.rb, line 120
def reindex!(index: nil, dataset: nil, batch_size: 100)
  index_name = index || timestamped_index
  import!(index: index_name, dataset: dataset, batch_size: batch_size)

  # Create an alias to the newly created index
  alias_index(index_name)
end
scroll!(scroll_id, duration) click to toggle source

Fetch the next page in a scroll without catching Errors.

# File lib/sequel/plugins/elasticsearch.rb, line 57
def scroll!(scroll_id, duration)
  scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result
  return nil unless scroll_id

  Result.new es_client.scroll(body: scroll_id, scroll: duration), self
end
timestamped_index() click to toggle source

Generate a timestamped index name. This will use the current timestamp to construct index names like this:

base-name-20191004.123456
# File lib/sequel/plugins/elasticsearch.rb, line 149
def timestamped_index
  time_str = Time.now.strftime('%Y%m%d.%H%M%S') # TODO: Make the format configurable
  "#{elasticsearch_index}-#{time_str}".to_sym
end