class Alephant::Publisher::Queue::RevalidateProcessor
Attributes
http_response_processor[R]
opts[R]
url_generator[R]
Public Class Methods
new(opts = nil, url_generator, http_response_processor)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 18 def initialize(opts = nil, url_generator, http_response_processor) @opts = opts @url_generator = url_generator @http_response_processor = http_response_processor end
Public Instance Methods
consume(message_collection)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 24 def consume(message_collection) return unless message_collection && message_collection.size > 0 message = message_collection.first msg_body = message_content(message) # @TODO: This is not a http response but a data struct. We should look at renaming this. http_response = { renderer_id: msg_body.fetch(:id), http_options: msg_body, http_response: get(message), ttl: http_response_processor.ttl(msg_body).to_s # @TODO: What happens if this is nil? Storage requires this to be a string. } http_message = build_http_message(message, ::JSON.generate(http_response)) write(http_message) message.delete logger.info(event: 'SQSMessageDeleted', message_content: message_content(message), method: "#{self.class}#consume") cache.delete(inflight_message_key(message)) logger.info(event: 'InFlightMessageDeleted', key: inflight_message_key(message), method: "#{self.class}#consume") end
Private Instance Methods
build_http_message(message, http_response)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 82 def build_http_message(message, http_response) OpenStruct.new({ body: http_response }) end
build_inflight_opts_hash(opts)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 65 def build_inflight_opts_hash(opts) opts_hash = Hash[opts['options'].map { |k, v| [k.to_sym, v] }] Crimp.signature(opts_hash) end
cache()
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 75 def cache @cache ||= proc do endpoint = opts.cache.fetch(:elasticache_config_endpoint) Dalli::ElastiCache.new(endpoint).client end.call end
get(message)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 88 def get(message) msg_content = message_content(message) url = url_generator.generate(msg_content) logger.info( event: 'Sending HTTP GET request', url: url, method: "#{self.class}#get" ) res = Faraday.get(url) logger.info( event: 'HTTP request complete', url: url, status: res.status, body: res.body, method: "#{self.class}#get" ) http_response_processor.process(msg_content, res.status, res.body) end
inflight_message_key(message)
click to toggle source
NOTE: If you change this, you'll need to change this in
`alephant-broker` also.
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 58 def inflight_message_key(message) opts = ::JSON.parse(message.body) version_cache_key( "inflight-#{opts['id']}/#{build_inflight_opts_hash(opts)}" ) end
message_content(message)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 111 def message_content(message) ::JSON.parse(message.body, symbolize_names: true) end
version_cache_key(key)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 70 def version_cache_key(key) cache_version = opts.cache[:elasticache_cache_version] [key, cache_version].compact.join('_') end
write(message)
click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 52 def write(message) RevalidateWriter.new(writer_config, message).run! end