module Kasket::ReadMixin

Public Class Methods

extended(base) click to toggle source
# File lib/kasket/read_mixin.rb, line 4
def self.extended(base)
  class << base
    alias_method :find_by_sql_without_kasket, :find_by_sql
    alias_method :find_by_sql, :find_by_sql_with_kasket
  end
end

Public Instance Methods

find_by_sql_with_kasket(*args) click to toggle source

*args can be replaced with (sql, *args) once we stop supporting Rails < 5.2

# File lib/kasket/read_mixin.rb, line 12
def find_by_sql_with_kasket(*args)
  sql = args[0]

  if use_kasket?
    query = if sql.respond_to?(:to_kasket_query)
      if ActiveRecord::VERSION::MAJOR < 5
        sql.to_kasket_query(self, args[1])
      else
        if ActiveRecord::VERSION::STRING < '5.2'
          sql.to_kasket_query(self, args[1].map(&:value_for_database))
        else
          sql.to_kasket_query(self)
        end
      end
    else
      kasket_parser.parse(sanitize_sql(sql))
    end
  end

  if query && has_kasket_index_on?(query[:index])
    if query[:key].is_a?(Array)
      find_by_sql_with_kasket_on_id_array(query[:key])
    else
      if value = Kasket.cache.read(query[:key])
        # Identified a specific edge case where memcached server returns 0x00 binary protocol response with no data
        # when the node is being rebooted which causes the Dalli memcached client to return a TrueClass object instead of nil
        # see: https://github.com/petergoldstein/dalli/blob/31dabf19d3dd94b348a00a59fe5a7b8fa80ce3ad/lib/dalli/server.rb#L520
        # and: https://github.com/petergoldstein/dalli/issues/390
        #
        # The code in this first condition of TrueClass === true  will
        # skip the kasket cache for these specific objects and go directly to SQL for retrieval.
        result_set = if value.is_a?(TrueClass)
          find_by_sql_without_kasket(*args)
        elsif value.is_a?(Array)
          filter_pending_records(find_by_sql_with_kasket_on_id_array(value))
        else
          filter_pending_records(Array.wrap(value).collect { |record| instantiate(record.dup) })
        end

        payload = {
          record_count: result_set.length,
          class_name: to_s
        }

        ActiveSupport::Notifications.instrument('instantiation.active_record', payload) { result_set }
      else
        store_in_kasket(query[:key], find_by_sql_without_kasket(*args))
      end
    end
  else
    find_by_sql_without_kasket(*args)
  end
end
find_by_sql_with_kasket_on_id_array(keys) click to toggle source
# File lib/kasket/read_mixin.rb, line 66
def find_by_sql_with_kasket_on_id_array(keys)
  begin
    key_attributes_map = Kasket.cache.read_multi(*keys)
  rescue RuntimeError => e
    # Elasticache Memcached has a bug where it returns a 0x00 binary protocol response with no data
    # during a reboot, causing the Dalli memcached client to throw a RuntimeError during a multi get
    # (https://github.com/petergoldstein/dalli/blob/v2.7.7/lib/dalli/server.rb#L148).
    # Fall back to the database when this happens.
    if e.message == "multi_response has completed"
      key_attributes_map = missing_records_from_db(keys)
    else
      raise
    end
  else
    found_keys, missing_keys = keys.partition {|k| key_attributes_map[k] }
    found_keys.each {|k| key_attributes_map[k] = instantiate(key_attributes_map[k].dup) }
    key_attributes_map.merge!(missing_records_from_db(missing_keys))
  end

  key_attributes_map.values.compact
end

Protected Instance Methods

filter_pending_records(records) click to toggle source
# File lib/kasket/read_mixin.rb, line 90
def filter_pending_records(records)
  if pending_records = Kasket.pending_records
    records.map { |record| pending_records.fetch(record, record) }.compact
  else
    records
  end
end
missing_records_from_db(missing_keys) click to toggle source
# File lib/kasket/read_mixin.rb, line 98
def missing_records_from_db(missing_keys)
  return {} if missing_keys.empty?

  id_key_map = Hash[missing_keys.map {|key| [key.split('=').last.to_i, key] }]

  found = without_kasket { where(id: id_key_map.keys).to_a }
  found.each(&:store_in_kasket)
  Hash[found.map {|record| [id_key_map[record.id], record] }]
end
store_in_kasket(key, records) click to toggle source
# File lib/kasket/read_mixin.rb, line 108
def store_in_kasket(key, records)
  if records.size == 1
    records.first.store_in_kasket(key)
  elsif records.empty?
    ActiveRecord::Base.logger.debug("[KASKET] would have stored an empty resultset") if ActiveRecord::Base.logger
  elsif records.size <= Kasket::CONFIGURATION[:max_collection_size]
    if records.all?(&:kasket_cacheable?)
      instance_keys = records.map(&:store_in_kasket)
      Kasket.cache.write(key, instance_keys)
    end
  end
  records
end