class ICFS::CacheElastic

Implements {ICFS::Cache Cache} using Elasticsearch

Constants

DefaultSize

default page size

Maps

The ES mappings for all of the indexes

ResultsCase

the Case results fields

ResultsEntry

Entry search results fields

ResultsIndex

Index search results fields

ResultsLog

Log search results fields

Public Class Methods

new(map, es) click to toggle source

New instance

@param map [Hash] Symbol to String of the indexes. Must provide

:case, :log, :entry, :action, :current, and :lock

@param es [Faraday] Faraday instance to the Elasticsearch cluster

# File lib/icfs/cache_elastic.rb, line 163
def initialize(map, es)
  @map = map
  @es = es
  @name = '%s:%d' % [Socket.gethostname, Process.pid]
  @name.freeze
end

Public Instance Methods

_agg_filter(name, qu, sub) click to toggle source

filter bucket aggregation

# File lib/icfs/cache_elastic.rb, line 872
def _agg_filter(name, qu, sub)
  ag = { name => { 'filter' => qu } }
  ag[name]['aggs'] = sub if sub
  return ag
end
_agg_nested(name, field, sub) click to toggle source

nested bucket aggregation

# File lib/icfs/cache_elastic.rb, line 882
def _agg_nested(name, field, sub)
  ag = { name => { 'nested' => { 'path' => field } } }
  ag[name]['aggs'] = sub if sub
  return ag
end
_agg_stats(name, field) click to toggle source

stats metric aggregation

# File lib/icfs/cache_elastic.rb, line 854
def _agg_stats(name, field)
  { name => { 'stats' => { 'field' => field } } }
end
_agg_terms(name, field, sub) click to toggle source

terms bucket aggregation

# File lib/icfs/cache_elastic.rb, line 862
def _agg_terms(name, field, sub)
  ag = { name => { 'terms' => { 'field' => field } } }
  ag[name]['aggs'] = sub if sub
  return ag
end
_query_all() click to toggle source

match all query

# File lib/icfs/cache_elastic.rb, line 256
def _query_all()
  { 'match_all' => {} }
end
_query_bool(must, filter, should, must_not) click to toggle source

bool query

# File lib/icfs/cache_elastic.rb, line 941
def _query_bool(must, filter, should, must_not)
  qu = {}
  qu['must'] = must if(must && !must.empty?)
  qu['filter'] = filter if(filter && !filter.empty?)
  qu['should'] = should if(should && !should.empty?)
  qu['must_not'] = must_not if(must_not && !must_not.empty?)
  if qu.empty?
    return { 'match_all' => {} }
  else
    return { 'bool' => qu }
  end
end
_query_constant(filter) click to toggle source

constant score

# File lib/icfs/cache_elastic.rb, line 1020
def _query_constant(filter)
  {'constant_score' => { 'filter' => filter } }
end
_query_exists(field, val) click to toggle source

Exists query

# File lib/icfs/cache_elastic.rb, line 901
def _query_exists(field, val)
  return nil if val.nil?
  { 'exists' => { 'field' => field } }
end
_query_keyw(field, val) click to toggle source

keyword query

# File lib/icfs/cache_elastic.rb, line 909
def _query_keyw(field, val)
  return nil if val.nil?
  if val.is_a?(Array)
    qu = { 'terms' => { field => val } }
  else
    qu = {'term' => { field => val } }
  end
  return qu
end
_query_match(field, val) click to toggle source

match query

# File lib/icfs/cache_elastic.rb, line 247
def _query_match(field, val)
  return nil if !val
  { 'match' => { field => { 'query' => val } } }
end
_query_nested(field, query) click to toggle source

Nested query

# File lib/icfs/cache_elastic.rb, line 454
def _query_nested(field, query)
  {
    'nested' => {
      'path' => field,
      'query' => query
    }
  }
end
_query_prefix(field, val) click to toggle source

prefix string query

# File lib/icfs/cache_elastic.rb, line 933
def _query_prefix(field, val)
  return nil if val.nil?
  return { 'prefix' => { field => val } }
end
_query_term(field, val) click to toggle source

Term query

# File lib/icfs/cache_elastic.rb, line 892
def _query_term(field, val)
  return nil if val.nil?
  { 'term' => { field => val } }
end
_query_times(field, val_gt, val_lt) click to toggle source

times query

# File lib/icfs/cache_elastic.rb, line 922
def _query_times(field, val_gt, val_lt)
  return nil if( val_gt.nil? && val_lt.nil? )
  tq = {}
  tq['gt'] = val_gt if val_gt
  tq['lt'] = val_lt if val_lt
  return {'range' => { field => tq } }
end
action_read(cid, anum) click to toggle source

(see Cache#action_read)

# File lib/icfs/cache_elastic.rb, line 551
def action_read(cid, anum)
  _read(:action, '%s.%d' % [cid, anum])
end
action_tags(query) click to toggle source

(see Cache#action_tags)

# File lib/icfs/cache_elastic.rb, line 1122
def action_tags(query)

  # build the query
  task_filter = [
    _query_term('tasks.assigned', query[:assigned]),
    _query_term('tasks.status', query[:status]),
    _query_term('tasks.flag', query[:flag]),
    _query_times('tasks.time', query[:after], query[:before]),
  ].compact
  qu_filt = _query_bool(nil, task_filter, nil, nil)
  ag = _agg_terms('tags', 'tasks.tags', nil)
  ag = _agg_filter('filt', qu_filt, ag)
  ag = _agg_nested('nest', 'tasks', ag)
  if query[:caseid]
    qu = _query_term('caseid', query[:caseid])
  else
    qu = _query_all()
  end
  req = {
    'query' => qu,
    'aggs' => ag,
    'size' => 0
  }

  # run the search
  url = @map[:action] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  # extract tags
  rh = JSON.parse(resp.body)
  rh = rh['aggregations']['nest']['filt']['tags']['buckets']
  list =  rh.map do |hh|
    {
      object: {
        tag: hh['key'],
        count: hh['doc_count'],
      }
    }
  end

  return {
    query: query,
    list: list.sort{|aa, bb| aa[:object][:tag] <=> bb[:object][:tag]}
  }
end
action_write(cid, anum, item) click to toggle source

(see Cache#action_write)

# File lib/icfs/cache_elastic.rb, line 559
def action_write(cid, anum, item)
  _write(:action, '%s.%d' % [cid, anum], item)
end
case_read(cid) click to toggle source

(see Cache#case_read)

# File lib/icfs/cache_elastic.rb, line 231
def case_read(cid)
  _read(:case, cid)
end
case_tags(query) click to toggle source

(see Cache#case_tags)

# File lib/icfs/cache_elastic.rb, line 1070
def case_tags(query)

  # build the query
  filter = [
    _query_term('status', query[:status]),
    _query_term('template', query[:template]),
  ].compact
  access = [
    _query_term('access.grant', query[:grantee]),
    _query_term('access.perm', query[:perm]),
  ].compact
  unless access.empty?
    qu = (access.size == 1) ? access[0] : _query_bool(nil, access, nil, nil)
    filter << _query_nested('access', qu)
  end
  qu = _query_bool(nil, filter, nil, nil)
  ag = _agg_terms('tags', 'tags', nil)
  req = {
    'query' => qu,
    'aggs' => ag,
    'size' => 0
  }

  # run the search
  url = @map[:case] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  # extract tags
  rh = JSON.parse(resp.body)
  rh = rh['aggregations']['tags']['buckets']
  list = rh.map do |hh|
    {
      object: {
        tag: hh['key'],
        count: hh['doc_count'],
      }
    }
  end

  return {
    query: query,
    list: list.sort{|aa, bb| aa[:object][:tag] <=> bb[:object][:tag] }
  }
end
case_write(cid, item) click to toggle source

(see Cache#case_write)

# File lib/icfs/cache_elastic.rb, line 239
def case_write(cid, item)
  _write(:case, cid, item)
end
current_read(cid) click to toggle source

(see Cache#current_read)

# File lib/icfs/cache_elastic.rb, line 215
def current_read(cid)
  _read(:current, cid)
end
current_write(cid, item) click to toggle source

(see Cache#current_write)

# File lib/icfs/cache_elastic.rb, line 223
def current_write(cid, item)
  _write(:current, cid, item)
end
entry_read(cid, enum) click to toggle source

(see Cache#entry_read)

# File lib/icfs/cache_elastic.rb, line 438
def entry_read(cid, enum)
  _read(:entry, '%s.%d' % [cid, enum])
end
entry_search(query) click to toggle source

(see Cache#entry_search)

# File lib/icfs/cache_elastic.rb, line 467
def entry_search(query)

  # build the query
  must = [
    _query_match('title', query[:title]),
    _query_match('content', query[:content]),
  ].compact
  filter = [
    _query_term('tags', query[:tags]),
    _query_term('caseid', query[:caseid]),
    _query_times('time', query[:after], query[:before]),
    _query_term('action', query[:action]),
    _query_term('index', query[:index]),
  ].compact
  stats = [
    _query_term('stats.name', query[:stat]),
    _query_term('stats.credit', query[:credit]),
  ].compact
  unless stats.empty?
    qu = (stats.size == 1) ? stats[0] : _query_bool(nil, stats, nil, nil)
    filter << _query_nested('stats', qu)
  end
  req = { 'query' => _query_bool(must, filter, nil, nil) }

  # highlight
  hl = {}
  hl['title'] = {} if query[:title]
  hl['content'] = {} if query[:content]
  req['highlight'] = { 'fields' => hl } unless hl.empty?

  # sort
  case query[:sort]
  when 'time_desc'
    req['sort'] = [
      { 'time' => 'desc' },
      { '_id' => 'desc' },
    ]
  when 'time_asc'
    req['sort'] = [
      { 'time' => 'asc' },
      { '_id' => 'desc' },
    ]
  when nil
    if !query[:title] && !query[:content]
     req['sort'] = [
        { 'time' => 'desc' },
        { '_id' => 'desc' },
      ]
    end
  end

  # paging
  _page(query, req)

  # run the search
  url = @map[:entry] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  return _results(resp, query, ResultsEntry)
end
entry_tags(query) click to toggle source

(see Cache#entry_tags)

# File lib/icfs/cache_elastic.rb, line 1028
def entry_tags(query)

  # build the query
  ag = _agg_terms('tags', 'tags', nil)
  qu = _query_term('caseid', query[:caseid])
  qu = _query_constant(qu)
  req = {
    'query' => qu,
    'aggs' => ag,
    'size' => 0
  }

  # run the search
  url = @map[:entry] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  # extract tags
  rh = JSON.parse(resp.body)
  rh = rh['aggregations']['tags']['buckets']
  list = rh.map do |hh|
    {
      object: {
        caseid: query[:caseid],
        tag: hh['key'],
        count: hh['doc_count'],
      }
    }
  end

  return {
    query: query,
    list: list.sort{|aa, bb| aa[:object][:tag] <=> bb[:object][:tag]}
  }
end
entry_write(cid, enum, item) click to toggle source

(see Cache#entry_write)

# File lib/icfs/cache_elastic.rb, line 446
def entry_write(cid, enum, item)
  _write(:entry, '%s.%d' % [cid, enum], item)
end
index_read(cid, xnum) click to toggle source

(see Cache#index_read)

# File lib/icfs/cache_elastic.rb, line 652
def index_read(cid, xnum)
  _read(:index, '%s.%d' % [cid, xnum])
end
index_tags(query) click to toggle source

(see Cache#index_tags)

# File lib/icfs/cache_elastic.rb, line 737
def index_tags(query)

  # build the query
  ag = _agg_terms('tags', 'tags', nil)
  qu = _query_term('caseid', query[:caseid])
  qu = _query_constant(qu)
  req = {
    'query' => qu,
    'aggs' => ag,
    'size' => 0
  }

  # run the search
  url = @map[:index] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  # extract tags
  rh = JSON.parse(resp.body)
  rh = rh['aggregations']['tags']['buckets']
  list = rh.map do |hh|
    {
      object: {
        caseid: query[:caseid],
        tag: hh['key'],
        count: hh['doc_count'],
      }
    }
  end

  return {
    query: query,
    list: list.sort{|aa, bb| aa[:object][:tag] <=> bb[:object][:tag]}
  }
end
index_write(cid, xnum, item) click to toggle source

(see Cache#index_write)

# File lib/icfs/cache_elastic.rb, line 644
def index_write(cid, xnum, item)
  _write(:index, '%s.%d' % [cid, xnum], item)
end
lock_release(cid) click to toggle source

(see Cache#lock_release)

# File lib/icfs/cache_elastic.rb, line 203
def lock_release(cid)
  url = '%s/_doc/%s' % [@map[:lock], CGI.escape(cid)]
  resp = @es.run_request(:delete, url, '', {})
  if !resp.success?
    raise('Elasticsearch lock release failed: %s' % cid)
  end
end
lock_take(cid) click to toggle source

(see Cache#lock_take)

# File lib/icfs/cache_elastic.rb, line 180
def lock_take(cid)

  json = '{"client":"%s"}' % @name
  url = '%s/_doc/%s/_create' % [@map[:lock], CGI.escape(cid)]
  head = {'Content-Type' => 'application/json'}.freeze

  # try to take
  tries = 5
  while tries > 0
    resp = @es.run_request(:put, url, json, head)
    return true if resp.success?
    tries = tries - 1
    sleep(0.1)
  end

  # failed to take lock
  raise('Elasticsearch lock take failed: %s' % cid)
end
log_read(cid, lnum) click to toggle source

(see Cache#log_read)

# File lib/icfs/cache_elastic.rb, line 779
def log_read(cid, lnum)
  _read(:log, '%s.%d' % [cid, lnum])
end
log_write(cid, lnum, item) click to toggle source

(see Cache#log_write)

# File lib/icfs/cache_elastic.rb, line 787
def log_write(cid, lnum, item)
  _write(:log, '%s.%d' % [cid, lnum], item)
end
stats(query) click to toggle source

(see Cache#stats)

# File lib/icfs/cache_elastic.rb, line 958
def stats(query)

  # aggs
  ag = _agg_stats('vals', 'stats.value')
  ag = _agg_terms('stats', 'stats.name', ag)
  if query[:credit]
    cd = _query_term('stats.credit', query[:credit])
    ag = _agg_filter('credit', cd, ag)
  end
  ag = _agg_nested('nested', 'stats', ag)

  # build the query
  filt = [
    _query_term('caseid', query[:caseid]),
    _query_times('time', query[:after], query[:before]),
  ].compact
  qu = _query_bool(nil, filt, nil, nil)

  # the request
  req = {
    'query' => qu,
    'aggs' => ag,
    'size' => 0,
  }

  # run the search
  url = @map[:entry] + '/_search'
  body = JSON.generate(req)
  head = { 'Content-Type' => 'application/json' }
  resp = @es.run_request(:get, url, body, head)
  raise 'search failed' if !resp.success?

  # extract stats
  rh = JSON.parse(resp.body)
  if query[:credit]
    rh = rh['aggregations']['nested']['credit']['stats']['buckets']
  else
    rh = rh['aggregations']['nested']['stats']['buckets']
  end
  list = rh.map do |hh|
    {
      object: {
        stat: hh['key'],
        sum: hh['vals']['sum'],
        count: hh['vals']['count'],
        min: hh['vals']['min'],
        max: hh['vals']['max'],
      }
    }
  end

  # return the results
  return {
    query: query,
    list: list
  }
end

Private Instance Methods

_page(query, req) click to toggle source

Do paging

@param query [Hash] the query @param req [Hash] the constructed ES request

# File lib/icfs/cache_elastic.rb, line 417
def _page(query, req)

  # size defaults
  size = query[:size] ? query[:size].to_i : 0
  size = DefaultSize if size == 0

  # page defaults to 1
  page = query[:page] ? query[:page].to_i : 0
  page = 1 if page == 0

  req['size'] = size
  req['from'] = (page - 1) * size

end
_results(resp, query, fields=nil) { |src| ... } click to toggle source

Process search results

@param resp [Hash] the response from Elasticsearch @param query [Hash] the original request @param fields [Hash] Fields to return @yield [src] The source object @yieldreturn [Hash] the search result object

# File lib/icfs/cache_elastic.rb, line 328
def _results(resp, query, fields=nil)

  # size defaults to 25
  size = query[:size] ? query[:size].to_i : 0
  size = DefaultSize if size == 0

  rh = JSON.parse(resp.body)
  results = {
    query: query,
    hits: rh['hits']['total'],
    size: size,
  }

  # process each result
  results[:list] = rh['hits']['hits'].map do |hh|

    src = hh['_source']
    hl = hh['highlight']

    if hl
      snip = String.new
      hl.each{|fn, ary| ary.each{|ht| snip << ht}}
    else
      snip = nil
    end

    # fields provided
    if fields
      obj = {}
      fields.each do |aa, bb|
        if bb.is_a?(Array)
          case bb[1]

          # a sub value
          when :sub
            val = src[bb[0]]
            obj[aa] = val.nil? ? 0 : val[bb[2]]

          # size of a value
          when :size
            val = src[bb[0]]
            obj[aa] = val.nil? ? 0 : val.size

          # zero for nil
          when :zero
            val = src[bb[0]]
            obj[aa] = val.nil? ? 0 : val

          # empty array for nil
          when :empty
            val = src[bb[0]]
            obj[aa] = val.nil? ? [] : val

          else
            raise(ArgumentError, 'Not a valid field option')
          end
        else
          obj[aa] = src[bb]
        end
      end

    # pass the source to the block to generate the search object
    else
      obj = yield src
    end

    # and provide each result
    {
      score: hh['_score'],
      snippet: snip,
      object: obj,
    }
  end

  return results
end