class Xcflushd::PriorityAuthRenewer

Apart from flushing all the cached reports and renewing the authorizations periodically, we need to provide a mechanism to renew a specific auth at any time. The information needed is the combination of service, application credentials and metric.

When the client looks for the auth of a combination in the cache, it might not be there. It could be an authorization that has never been cached or one that has expired. In that case, we need to provide a way to check a specific authorization without waiting for the next flush cycle.

We use Redis publish/subscribe to solve this problem. We use 2 different type of channels:

1) Auth requests channel. It's the channel where the client specifies the
   combinations that need to be checked. xcflushd is subscribed to the
   channel. There is only one channel of this type.
2) Responses channel. Every time there's a request for a specific
   combination, a channel of this type is created. The client is
   subscribed to this channel, and xcflushd will publish the authorization
   status once it gets it from 3scale.

Constants

PUBLISH_WAIT_TIMES

Attributes

auth_ttl[R]
authorizer[R]
current_auths[R]
logger[R]
redis_pub[R]
redis_sub[R]
storage[R]
thread_pool[R]

Public Class Methods

new(authorizer, storage, redis_pub, redis_sub, auth_ttl, logger, threads) click to toggle source

We need two separate Redis clients: one for subscribing to a channel and the other one to publish to different channels. It is specified in the Redis website: redis.io/topics/pubsub

# File lib/xcflushd/priority_auth_renewer.rb, line 29
def initialize(authorizer, storage, redis_pub, redis_sub,
               auth_ttl, logger, threads)
  @authorizer = authorizer
  @storage = storage
  @redis_pub = redis_pub
  @redis_sub = redis_sub
  @auth_ttl = auth_ttl
  @logger = logger

  # We can receive several requests to renew the authorization of a
  # combination while we are already renewing it. We want to avoid
  # performing several calls to 3scale asking for the same thing. For that
  # reason, we use a map to keep track of the combinations that we are
  # renewing.
  # This map is updated from different threads. We use Concurrent::Map to
  # ensure thread-safety.
  @current_auths = Concurrent::Map.new

  @thread_pool = Concurrent::FixedThreadPool.new(threads)
end

Public Instance Methods

shutdown() click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 50
def shutdown
  @thread_pool.shutdown
end
start() click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 62
def start
  begin
    subscribe_to_requests_channel
  rescue StandardError => e
    logger.error("PriorityAuthRenewer can't subscribe to the requests "\
                 "channel - #{e.class} #{e.message} #{e.cause}")
    raise e
  end
end
terminate() click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 58
def terminate
  @thread_pool.kill
end
wait_for_termination(secs = nil) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 54
def wait_for_termination(secs = nil)
  @thread_pool.wait_for_termination(secs)
end

Private Instance Methods

app_authorizations(combination) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 160
def app_authorizations(combination)
  authorizer.authorizations(combination[:service_id],
                            combination[:credentials],
                            [combination[:metric]])
end
async_renew_and_publish_task(channel_msg) click to toggle source

Apart from renewing the auth of the combination received, we also renew all the metrics of the associated application. The reason is that to renew a single metric we need to perform one call to 3scale, and to renew all the limited metrics of an application we also need one. If the metric received does not have limits defined, we need to perform two calls, but still it is worth to renew all of them for that price.

Note: Some exceptions can be raised inside the futures that are executed by the thread pool. For example, when 3scale is not accessible, when renewing the cached authorizations fails, or when publishing to the response channels fails. Trying to recover from all those cases does not seem to be worth it. The request that published the message will wait for a response that will not arrive and eventually, it will timeout. However, if the request retries, it is likely to succeed, as the kind of errors listed above are (hopefully) temporary.

# File lib/xcflushd/priority_auth_renewer.rb, line 130
def async_renew_and_publish_task(channel_msg)
  Concurrent::Future.new(executor: thread_pool) do
    success = true
    begin
      combination = auth_channel_msg_2_combination(channel_msg)
      app_auths = app_authorizations(combination)
      renew(combination[:service_id], combination[:credentials], app_auths)
      metric_auth = app_auths[combination[:metric]]
    rescue StandardError
      # If we do not do rescue, we would not be able to process the same
      # message again.
      success = false
    ensure
      mark_auth_task_as_finished(channel_msg)
    end

    # We only publish a message when there aren't any errors. When
    # success is false, we could have renewed some auths, so this could
    # be more fine grained and ping the subscribers that are not interested
    # in the auths that failed. Also, as we do not publish anything when
    # there is an error, the subscriber waits until it timeouts.
    # This is good enough for now, but there is room for improvement.
    publish_auth(combination, metric_auth) if success
  end
end
auth_channel_msg_2_combination(msg) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 156
def auth_channel_msg_2_combination(msg)
  StorageKeys.pubsub_auth_msg_2_auth_info(msg)
end
authorization_message(authorization) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 176
def authorization_message(authorization)
  if authorization.authorized?
    '1'.freeze
  else
     authorization.reason ? "0:#{authorization.reason}" : '0'.freeze
  end
end
channel_for_combination(combination) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 170
def channel_for_combination(combination)
  StorageKeys.pubsub_auths_resp_channel(combination[:service_id],
                                        combination[:credentials],
                                        combination[:metric])
end
currently_authorizing?(channel_msg) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 207
def currently_authorizing?(channel_msg)
  # A simple solution would be something like:
  # if !current_auths[channel_msg]
  #   current_auths[channel_msg] = true;
  #   perform_work
  #   current_auths.delete(channel_msg)
  # end
  # The problem is that the read/write is not atomic. Therefore, several
  # threads could enter the if at the same time repeating work. That is
  # why we use concurrent-ruby's Map#put_if_absent, which is atomic.

  # The value we set in the map is not relevant. #put_if_absent returns
  # nil when the key is not in the map, which means that we are not
  # currently authorizing it. That is all we care about.
  current_auths.put_if_absent(channel_msg, true) != nil
end
mark_auth_task_as_finished(channel_msg) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 224
def mark_auth_task_as_finished(channel_msg)
  current_auths.delete(channel_msg)
end
publish_auth(combination, authorization) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 199
def publish_auth(combination, authorization)
  publish_message channel_for_combination(combination),
                  authorization_message(authorization)
rescue => e
  logger.warn "cannot publish in priority channel " \
              "for combination #{combination}: #{e}"
end
publish_message(channel, msg) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 184
def publish_message(channel, msg)
  wait = PUBLISH_WAIT_TIMES.each

  begin
    redis_pub.publish(channel, msg)
  rescue => e
    begin
      sleep wait.next
    rescue StopIteration
      raise e
    end
    retry
  end
end
renew(service_id, credentials, auths) click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 166
def renew(service_id, credentials, auths)
  storage.renew_auths(service_id, credentials, auths, auth_ttl)
end
subscribe_to_requests_channel() click to toggle source
# File lib/xcflushd/priority_auth_renewer.rb, line 77
def subscribe_to_requests_channel
  redis_sub.subscribe(StorageKeys::AUTH_REQUESTS_CHANNEL) do |on|
    on.subscribe do |channel, _subscriptions|
      logger.info("PriorityAuthRenewer correctly subscribed to #{channel}")
    end

    on.message do |_channel, msg|
      begin
        # The renew and publish operations need to be done asynchronously.
        # Renewing the authorizations involves getting them from 3scale,
        # making networks requests, and also updating Redis. We cannot block
        # until we get all that done. That is why we need to treat the
        # messages received in the channel concurrently.
        unless currently_authorizing?(msg)
          async_renew_and_publish_task(msg).execute
        end
      rescue Concurrent::RejectedExecutionError => e
        # This error is raised when we try to submit a task to the thread
        # pool and it is rejected.
        # After we call shutdown() on the thread pool, this error will be
        # raised. We do not want to log errors in this case.
        unless thread_pool.shuttingdown?
          logger.error('Error while treating a message received in the '\
                       "requests channel: #{e.message}")
        end
      rescue StandardError => e
        # If we do not rescue from an exception raised while treating a
        # message, the redis client instance used stops receiving messages.
        # We need to make sure that we'll rescue in all cases.
        # Keep in mind that this will not rescue from exceptions raised in
        # async tasks because they are executed in different threads.
        logger.error('Error while treating a message received in the '\
                     "requests channel: #{e.message}")
      end
    end
  end
end