class Xcflushd::Storage

The error handling could be improved to try to avoid losing reports However, there are trade-offs to be made. Complex error handling can complicate a lot the code. Also, there are no guarantees that the code in the rescue clauses will be executed correctly. For example, if an smembers operations fails because Redis is not accessible, and the error handling consists of performing other operations to Redis, the error handling could fail too. Some characteristics of Redis, like the absence of rollbacks limit the kind of things we can do in case of error. In the future, we might explore other options like lua scripts or keeping a journal (in Redis or disk).

Constants

CLEANUP_ERROR
REDIS_BATCH_KEYS

Some Redis operations might block the server for a long time if they need to operate on big collections of keys or values. For that reason, when using pipelines, instead of sending all the keys in a single pipeline, we send them in batches. If the batch is too big, we might block the server for a long time. If it is too little, we will waste time in network round-trips to the server.

RETRIEVING_REPORTS_ERROR
SOME_REPORTS_MISSING_ERROR

Attributes

logger[R]
storage[R]
storage_keys[R]

Public Class Methods

new(storage, logger, storage_keys) click to toggle source
# File lib/xcflushd/storage.rb, line 40
def initialize(storage, logger, storage_keys)
  @storage = storage
  @logger = logger
  @storage_keys = storage_keys
end

Public Instance Methods

renew_auths(service_id, credentials, authorizations, auth_ttl) click to toggle source
# File lib/xcflushd/storage.rb, line 73
def renew_auths(service_id, credentials, authorizations, auth_ttl)
  hash_key = hash_key(:auth, service_id, credentials)

  authorizations.each_slice(REDIS_BATCH_KEYS) do |authorizations_slice|
    # Array with the hash key and all the sorted key-values
    hmset_args = [hash_key]

    authorizations_slice.each do |metric, auth|
      hmset_args << metric
      hmset_args << auth_value(auth)
    end

    storage.hmset(*hmset_args)
  end

  set_auth_validity(service_id, credentials, auth_ttl)

rescue Redis::BaseError
  raise RenewAuthError.new(service_id, credentials)
end
report(reports) click to toggle source
# File lib/xcflushd/storage.rb, line 94
def report(reports)
  reports.each do |report|
    increase_usage(report)
    add_to_set_keys_cached_reports(report)
  end
end
reports_to_flush() click to toggle source

This performs a cleanup of the reports to be flushed. We can decide later whether it is better to leave this responsibility to the caller of the method.

Returns an array of hashes where each of them has a service_id, credentials, and a usage. The usage is another hash where the keys are the metrics and the values are guaranteed to respond to to_i and to_s.

# File lib/xcflushd/storage.rb, line 53
def reports_to_flush
  # The Redis rename command overwrites the key with the new name if it
  # exists. This means that if the rename operation fails in a flush cycle,
  # and succeeds in a next one, the data that the key had in the first
  # flush cycle will be lost.
  # For that reason, every time we need to rename a key, we will use a
  # unique suffix. This way, when the rename operation fails, the key
  # will not be overwritten later, and we will be able to recover its
  # content.
  suffix = suffix_for_unique_naming

  report_keys = report_keys_to_flush(suffix)
  if report_keys.empty?
    logger.warn "No reports available to flush"
    report_keys
  else
    reports(report_keys, suffix)
  end
end

Private Instance Methods

add_to_set_keys_cached_reports(report) click to toggle source
# File lib/xcflushd/storage.rb, line 241
def add_to_set_keys_cached_reports(report)
  hash_key = hash_key(:report, report[:service_id], report[:credentials])
  storage.sadd(set_keys_cached_reports, hash_key)
end
auth_value(auth) click to toggle source
# File lib/xcflushd/storage.rb, line 246
def auth_value(auth)
  if auth.authorized?
    '1'.freeze
  else
    auth.reason ? "0:#{auth.reason}" : '0'.freeze
  end
end
delete(keys) click to toggle source
# File lib/xcflushd/storage.rb, line 195
def delete(keys)
  tries ||= 3
  storage.del(keys)
rescue Redis::BaseError
  # Failing to delete certain keys could be problematic. That's why we
  # retry in case the error is temporary, like a network hiccup.
  #
  # When we rename keys, we give them a unique suffix so they are not
  # overwritten in the next cycle and we can retrieve their content
  # later. On the other hand, when we can retrieve their content
  # successfully, we delete them. The problem is that the delete operation
  # can fail. When trying to recover contents of keys that failed to be
  # renamed we'll not be able to distinguish these 2 cases:
  # 1) The key is there because we decided not to delete it to retrieve
  #    its content later.
  # 2) The key is there because the delete operation failed.
  # We could take a look at the logs to figure out what happened, but of
  # course that is not an ideal solution.
  if tries > 0
    tries -= 1
    sleep(0.1)
    retry
  else
    logger.error("#{CLEANUP_ERROR} Keys: #{keys}")
  end
end
flushing_report_keys(suffix) click to toggle source
# File lib/xcflushd/storage.rb, line 130
def flushing_report_keys(suffix)
  res = storage.smembers(set_keys_flushing_reports(suffix))
rescue Redis::BaseError
  logger.error(RETRIEVING_REPORTS_ERROR)
  []
else
  # We only delete the set if there is not an error. If there is an error,
  # it's not deleted, so it can be recovered later.
  delete([set_keys_flushing_reports(suffix)])
  res
end
hash_key(type, service_id, credentials) click to toggle source
# File lib/xcflushd/storage.rb, line 262
def hash_key(type, service_id, credentials)
  storage_keys.send("#{type}_hash_key", service_id, credentials)
end
increase_usage(report) click to toggle source
# File lib/xcflushd/storage.rb, line 230
def increase_usage(report)
  hash_key = hash_key(:report, report[:service_id], report[:credentials])

  report[:usage].each_slice(REDIS_BATCH_KEYS) do |usages|
    usages.each do |usage|
      metric, value = usage
      storage.hincrby(hash_key, metric, value)
    end
  end
end
rename(keys) click to toggle source
# File lib/xcflushd/storage.rb, line 177
def rename(keys)
  keys.each_slice(REDIS_BATCH_KEYS) do |keys_slice|
    begin
      storage.pipelined do
        keys_slice.each do |old_name, new_name|
          storage.rename(old_name, new_name)
        end
      end
    rescue Redis::BaseError
      # The cached reports will not be reported now, but they will not be
      # lost. They will be reported next time there are hits for that
      # specific metric.
      # We cannot know which ones failed because we are using a pipeline.
      logger.warn(SOME_REPORTS_MISSING_ERROR)
    end
  end
end
report_keys_to_flush(suffix) click to toggle source
# File lib/xcflushd/storage.rb, line 105
def report_keys_to_flush(suffix)
  begin
    return [] if storage.scard(set_keys_cached_reports) == 0
    storage.rename(set_keys_cached_reports,
                   set_keys_flushing_reports(suffix))
  rescue Redis::BaseError
    # We could not even start the process of getting the reports, so just
    # log an error and return [].
    logger.error(RETRIEVING_REPORTS_ERROR)
    return []
  end

  flushing_reports = flushing_report_keys(suffix)

  keys_with_flushing_prefix = flushing_reports.map do |key|
    storage_keys.name_key_to_flush(key, suffix)
  end

  # Hash with old names as keys and new ones as values
  key_names = Hash[flushing_reports.zip(keys_with_flushing_prefix)]
  rename(key_names)

  key_names.values
end
reports(keys_to_flush, suffix) click to toggle source

Returns a report (hash with service_id, credentials, and usage) for each of the keys received.

# File lib/xcflushd/storage.rb, line 144
def reports(keys_to_flush, suffix)
  result = []

  keys_to_flush.each_slice(REDIS_BATCH_KEYS) do |keys|
    begin
      usages = storage.pipelined { keys.each { |k| storage.hgetall(k) } }
    rescue Redis::BaseError
      # The reports in a batch where hgetall failed will not be reported
      # now, but they will not be lost. They keys will not be deleted, so
      # we will be able to retrieve them later and retry.
      # We cannot know which ones failed because we are using a pipeline.
      logger.error(SOME_REPORTS_MISSING_ERROR)
    else
      keys.each_with_index do |key, i|
        # hgetall returns {} for keys that do not exist. That can happen
        # for 2 reasons:
        # 1) Apicast-xc does not guarantee that a key in the set of cached
        # reports will always exist.
        # 2) We failed to rename the key in the previous step.
        unless usages[i].empty?
          service_id, creds = storage_keys.service_and_creds(key, suffix)
          result << { service_id: service_id,
                      credentials: creds,
                      usage: usages[i] }
        end
      end
      delete(keys)
    end
  end

  result
end
set_auth_validity(service_id, credentials, auth_ttl) click to toggle source
# File lib/xcflushd/storage.rb, line 222
def set_auth_validity(service_id, credentials, auth_ttl)
  # Redis does not allow us to set a TTL for hash key fields. TTLs can only
  # be applied to the key containing the hash. This is not a problem
  # because we always renew all the metrics of an application at the same
  # time.
  storage.expire(hash_key(:auth, service_id, credentials), auth_ttl)
end
set_keys_cached_reports() click to toggle source
# File lib/xcflushd/storage.rb, line 266
def set_keys_cached_reports
  storage_keys::SET_KEYS_CACHED_REPORTS
end
set_keys_flushing_reports(suffix) click to toggle source
# File lib/xcflushd/storage.rb, line 258
def set_keys_flushing_reports(suffix)
  "#{storage_keys::SET_KEYS_FLUSHING_REPORTS}#{suffix}"
end
suffix_for_unique_naming() click to toggle source
# File lib/xcflushd/storage.rb, line 254
def suffix_for_unique_naming
  "_#{Time.now.utc.strftime('%Y%m%d%H%M%S'.freeze)}"
end