class Xcflushd::PriorityAuthRenewer
Apart from flushing all the cached reports and renewing the authorizations periodically, we need to provide a mechanism to renew a specific auth at any time. The information needed is the combination of service, application credentials and metric.
When the client looks for the auth of a combination in the cache, it might not be there. It could be an authorization that has never been cached or one that has expired. In that case, we need to provide a way to check a specific authorization without waiting for the next flush cycle.
We use Redis publish/subscribe to solve this problem. We use 2 different type of channels:
1) Auth requests channel. It's the channel where the client specifies the combinations that need to be checked. xcflushd is subscribed to the channel. There is only one channel of this type. 2) Responses channel. Every time there's a request for a specific combination, a channel of this type is created. The client is subscribed to this channel, and xcflushd will publish the authorization status once it gets it from 3scale.
Constants
- PUBLISH_WAIT_TIMES
Attributes
Public Class Methods
We need two separate Redis clients: one for subscribing to a channel and the other one to publish to different channels. It is specified in the Redis website: redis.io/topics/pubsub
# File lib/xcflushd/priority_auth_renewer.rb, line 29 def initialize(authorizer, storage, redis_pub, redis_sub, auth_ttl, logger, threads) @authorizer = authorizer @storage = storage @redis_pub = redis_pub @redis_sub = redis_sub @auth_ttl = auth_ttl @logger = logger # We can receive several requests to renew the authorization of a # combination while we are already renewing it. We want to avoid # performing several calls to 3scale asking for the same thing. For that # reason, we use a map to keep track of the combinations that we are # renewing. # This map is updated from different threads. We use Concurrent::Map to # ensure thread-safety. @current_auths = Concurrent::Map.new @thread_pool = Concurrent::FixedThreadPool.new(threads) end
Public Instance Methods
# File lib/xcflushd/priority_auth_renewer.rb, line 50 def shutdown @thread_pool.shutdown end
# File lib/xcflushd/priority_auth_renewer.rb, line 62 def start begin subscribe_to_requests_channel rescue StandardError => e logger.error("PriorityAuthRenewer can't subscribe to the requests "\ "channel - #{e.class} #{e.message} #{e.cause}") raise e end end
# File lib/xcflushd/priority_auth_renewer.rb, line 58 def terminate @thread_pool.kill end
# File lib/xcflushd/priority_auth_renewer.rb, line 54 def wait_for_termination(secs = nil) @thread_pool.wait_for_termination(secs) end
Private Instance Methods
Apart from renewing the auth of the combination received, we also renew all the metrics of the associated application. The reason is that to renew a single metric we need to perform one call to 3scale, and to renew all the limited metrics of an application we also need one. If the metric received does not have limits defined, we need to perform two calls, but still it is worth to renew all of them for that price.
Note: Some exceptions can be raised inside the futures that are executed by the thread pool. For example, when 3scale is not accessible, when renewing the cached authorizations fails, or when publishing to the response channels fails. Trying to recover from all those cases does not seem to be worth it. The request that published the message will wait for a response that will not arrive and eventually, it will timeout. However, if the request retries, it is likely to succeed, as the kind of errors listed above are (hopefully) temporary.
# File lib/xcflushd/priority_auth_renewer.rb, line 130 def async_renew_and_publish_task(channel_msg) Concurrent::Future.new(executor: thread_pool) do success = true begin combination = auth_channel_msg_2_combination(channel_msg) app_auths = app_authorizations(combination) renew(combination[:service_id], combination[:credentials], app_auths) metric_auth = app_auths[combination[:metric]] rescue StandardError # If we do not do rescue, we would not be able to process the same # message again. success = false ensure mark_auth_task_as_finished(channel_msg) end # We only publish a message when there aren't any errors. When # success is false, we could have renewed some auths, so this could # be more fine grained and ping the subscribers that are not interested # in the auths that failed. Also, as we do not publish anything when # there is an error, the subscriber waits until it timeouts. # This is good enough for now, but there is room for improvement. publish_auth(combination, metric_auth) if success end end
# File lib/xcflushd/priority_auth_renewer.rb, line 156 def auth_channel_msg_2_combination(msg) StorageKeys.pubsub_auth_msg_2_auth_info(msg) end
# File lib/xcflushd/priority_auth_renewer.rb, line 170 def channel_for_combination(combination) StorageKeys.pubsub_auths_resp_channel(combination[:service_id], combination[:credentials], combination[:metric]) end
# File lib/xcflushd/priority_auth_renewer.rb, line 224 def mark_auth_task_as_finished(channel_msg) current_auths.delete(channel_msg) end
# File lib/xcflushd/priority_auth_renewer.rb, line 199 def publish_auth(combination, authorization) publish_message channel_for_combination(combination), authorization_message(authorization) rescue => e logger.warn "cannot publish in priority channel " \ "for combination #{combination}: #{e}" end
# File lib/xcflushd/priority_auth_renewer.rb, line 184 def publish_message(channel, msg) wait = PUBLISH_WAIT_TIMES.each begin redis_pub.publish(channel, msg) rescue => e begin sleep wait.next rescue StopIteration raise e end retry end end
# File lib/xcflushd/priority_auth_renewer.rb, line 166 def renew(service_id, credentials, auths) storage.renew_auths(service_id, credentials, auths, auth_ttl) end
# File lib/xcflushd/priority_auth_renewer.rb, line 77 def subscribe_to_requests_channel redis_sub.subscribe(StorageKeys::AUTH_REQUESTS_CHANNEL) do |on| on.subscribe do |channel, _subscriptions| logger.info("PriorityAuthRenewer correctly subscribed to #{channel}") end on.message do |_channel, msg| begin # The renew and publish operations need to be done asynchronously. # Renewing the authorizations involves getting them from 3scale, # making networks requests, and also updating Redis. We cannot block # until we get all that done. That is why we need to treat the # messages received in the channel concurrently. unless currently_authorizing?(msg) async_renew_and_publish_task(msg).execute end rescue Concurrent::RejectedExecutionError => e # This error is raised when we try to submit a task to the thread # pool and it is rejected. # After we call shutdown() on the thread pool, this error will be # raised. We do not want to log errors in this case. unless thread_pool.shuttingdown? logger.error('Error while treating a message received in the '\ "requests channel: #{e.message}") end rescue StandardError => e # If we do not rescue from an exception raised while treating a # message, the redis client instance used stops receiving messages. # We need to make sure that we'll rescue in all cases. # Keep in mind that this will not rescue from exceptions raised in # async tasks because they are executed in different threads. logger.error('Error while treating a message received in the '\ "requests channel: #{e.message}") end end end end