class Alephant::Publisher::Queue::RevalidateProcessor

Attributes

http_response_processor[R]
opts[R]
url_generator[R]

Public Class Methods

new(opts = nil, url_generator, http_response_processor) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 18
def initialize(opts = nil, url_generator, http_response_processor)
  @opts                    = opts
  @url_generator           = url_generator
  @http_response_processor = http_response_processor
end

Public Instance Methods

consume(message_collection) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 24
def consume(message_collection)
  return unless message_collection && message_collection.size > 0

  message = message_collection.first

  msg_body = message_content(message)

  # @TODO: This is not a http response but a data struct. We should look at renaming this.
  http_response = {
    renderer_id:   msg_body.fetch(:id),
    http_options:  msg_body,
    http_response: get(message),
    ttl:           http_response_processor.ttl(msg_body).to_s # @TODO: What happens if this is nil? Storage requires this to be a string.
  }

  http_message = build_http_message(message, ::JSON.generate(http_response))

  write(http_message)

  message.delete
  logger.info(event: 'SQSMessageDeleted', message_content: message_content(message), method: "#{self.class}#consume")

  cache.delete(inflight_message_key(message))
  logger.info(event: 'InFlightMessageDeleted', key: inflight_message_key(message), method: "#{self.class}#consume")
end

Private Instance Methods

build_http_message(message, http_response) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 82
def build_http_message(message, http_response)
  OpenStruct.new({
    body: http_response
  })
end
build_inflight_opts_hash(opts) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 65
def build_inflight_opts_hash(opts)
  opts_hash = Hash[opts['options'].map { |k, v| [k.to_sym, v] }]
  Crimp.signature(opts_hash)
end
cache() click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 75
def cache
  @cache ||= proc do
    endpoint = opts.cache.fetch(:elasticache_config_endpoint)
    Dalli::ElastiCache.new(endpoint).client
  end.call
end
get(message) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 88
def get(message)
  msg_content = message_content(message)
  url         = url_generator.generate(msg_content)

  logger.info(
    event:  'Sending HTTP GET request',
    url:    url,
    method: "#{self.class}#get"
  )

  res = Faraday.get(url)

  logger.info(
    event:  'HTTP request complete',
    url:    url,
    status: res.status,
    body:   res.body,
    method: "#{self.class}#get"
  )

  http_response_processor.process(msg_content, res.status, res.body)
end
inflight_message_key(message) click to toggle source

NOTE: If you change this, you'll need to change this in

`alephant-broker` also.
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 58
def inflight_message_key(message)
  opts = ::JSON.parse(message.body)
  version_cache_key(
    "inflight-#{opts['id']}/#{build_inflight_opts_hash(opts)}"
  )
end
message_content(message) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 111
def message_content(message)
  ::JSON.parse(message.body, symbolize_names: true)
end
version_cache_key(key) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 70
def version_cache_key(key)
  cache_version = opts.cache[:elasticache_cache_version]
  [key, cache_version].compact.join('_')
end
write(message) click to toggle source
# File lib/alephant/publisher/queue/revalidate_processor.rb, line 52
def write(message)
  RevalidateWriter.new(writer_config, message).run!
end