class Scalastic::Partition
Constants
- Endpoint
- Endpoints
Attributes
config[R]
es_client[R]
id[R]
Public Class Methods
new(es_client, config, id)
click to toggle source
# File lib/scalastic/partition.rb, line 17 def initialize(es_client, config, id) raise(ArgumentError, 'ES client is nil!') if es_client.nil? raise(ArgumentError, 'config is nil!') if config.nil? raise(ArgumentError, 'id is empty!') if id.nil? || id.to_s.empty? @es_client = es_client @config = config @id = id end
Public Instance Methods
bulk(args)
click to toggle source
# File lib/scalastic/partition.rb, line 85 def bulk(args) body = args.clone[:body] || raise(ArgumentError, 'Missing required argument :body') new_ops = body.map{|entry| [operation_name(entry), entry]}.reduce([]){|acc, op| acc << [op.first, update_entry(acc, *op)]; acc} args[:body] = new_ops.map{|_op, entry| entry} es_client.bulk(args) end
create(args)
click to toggle source
# File lib/scalastic/partition.rb, line 67 def create(args) args[:body] ||= {} selector.apply_to(args[:body]) args = args.merge(index: config.index_endpoint(id)) es_client.create(args) end
delete(args = {})
click to toggle source
# File lib/scalastic/partition.rb, line 74 def delete(args = {}) args = args.merge(index: config.search_endpoint(id)) es_client.delete(args) end
delete_by_query(args)
click to toggle source
# File lib/scalastic/partition.rb, line 94 def delete_by_query(args) args = args.merge(index: config.search_endpoint(id), search_type: 'scan', scroll: '1m', size: 500, fields: []) results = es_client.search(args) loop do scroll_id = results['_scroll_id'] results = es_client.scroll(scroll_id: scroll_id, scroll: '1m') ops = results['hits']['hits'].map{|h| delete_op(h)} break if ops.empty? es_client.bulk(body: ops) end end
exists?()
click to toggle source
# File lib/scalastic/partition.rb, line 79 def exists? names = [config.search_endpoint(id), config.index_endpoint(id)] all_aliases = normalized(es_client.indices.get_aliases name: names.join(',')) all_aliases.any?{|_index, data| data['aliases'].any?} end
extend_to(args)
click to toggle source
# File lib/scalastic/partition.rb, line 27 def extend_to(args) index = args[:index] raise(ArgumentError, 'Missing required argument :index') if index.nil? || index.to_s.empty? index_alias = config.index_endpoint(id) indices = normalized(es_client.indices.get_aliases(name: index_alias)).select{|i, d| d['aliases'].any?}.keys actions = indices.map{|i| {remove: {index: i, alias: index_alias}}} actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))} actions << {add: EsActionsGenerator.new_search_alias(config, args.merge(id: id))} es_client.indices.update_aliases(body: {actions: actions}) end
get(args)
click to toggle source
# File lib/scalastic/partition.rb, line 50 def get(args) args = args.merge(index: config.search_endpoint(id)) es_client.get(args) end
get_endpoints()
click to toggle source
# File lib/scalastic/partition.rb, line 110 def get_endpoints sa = config.search_endpoint(id) ia = config.index_endpoint(id) aliases = normalized(es_client.indices.get_aliases name: [sa, ia].join(',')) sas = aliases.map{|i, d| [i, d['aliases'][sa]]}.reject{|_i, sa| sa.nil?} ias = aliases.map{|i, d| [i, d['aliases'][ia]]}.reject{|_i, ia| ia.nil?} Endpoints.new( ias.map{|i, ia| Endpoint.new(i, ia['index_routing']).freeze}.first, sas.map{|i, sa| Endpoint.new(i, sa['search_routing']).freeze}.freeze ).freeze end
index(args)
click to toggle source
# File lib/scalastic/partition.rb, line 60 def index(args) args[:body] ||= {} selector.apply_to(args[:body]) args = args.merge(index: config.index_endpoint(id)) es_client.index(args) end
index_to(args)
click to toggle source
# File lib/scalastic/partition.rb, line 122 def index_to(args) ie = config.index_endpoint(id) eps = get_endpoints actions = [] actions << {remove: {index: eps.index.index, alias: ie}} if eps.index actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))} unless args.nil? #TODO: log a warning if there're no updates es_client.indices.update_aliases(body: {actions: actions}) if actions.any? end
inspect()
click to toggle source
# File lib/scalastic/partition.rb, line 106 def inspect "ES partition #{id}" end
mget(args)
click to toggle source
# File lib/scalastic/partition.rb, line 55 def mget(args) args = args.merge(index: config.search_endpoint(id)) es_client.mget(args) end
msearch(args)
click to toggle source
# File lib/scalastic/partition.rb, line 44 def msearch(args) endpoint = config.search_endpoint(id) es_client.msearch(args.merge(index: endpoint)) end
readonly?()
click to toggle source
# File lib/scalastic/partition.rb, line 132 def readonly? get_endpoints.index.nil? end
scroll(args)
click to toggle source
# File lib/scalastic/partition.rb, line 136 def scroll(args) args = args.merge(index: config.search_endpoint(id)) Scroller.new(es_client, args) end
search(args = {})
click to toggle source
# File lib/scalastic/partition.rb, line 39 def search(args = {}) args = args.merge(index: config.search_endpoint(id)) es_client.search(args) end
Private Instance Methods
delete_op(hit)
click to toggle source
# File lib/scalastic/partition.rb, line 161 def delete_op(hit) {delete: {_index: hit['_index'], _type: hit['_type'], _id: hit['_id']}} end
operation_name(entry)
click to toggle source
# File lib/scalastic/partition.rb, line 143 def operation_name(entry) [:create, :index, :update, :delete].find{|name| entry.has_key?(name)} end
selector()
click to toggle source
# File lib/scalastic/partition.rb, line 165 def selector @selector ||= PartitionSelector.new(config.partition_selector, id) end
update_entry(acc, operation, entry)
click to toggle source
# File lib/scalastic/partition.rb, line 147 def update_entry(acc, operation, entry) if (operation) op_data = entry[operation] op_data[:_index] = config.index_endpoint(id) selector.apply_to(op_data[:data]) if op_data.has_key?(:data) && [:index, :create].include?(operation) else parent = acc.last # A previous record must be create/index/update/delete raise(ArgumentError, "Unexpected entry: #{entry}") unless parent && parent.first selector.apply_to(entry) if [:index, :create].include?(parent.first) end entry end