class LogStash::Inputs::Cloudflare

Public Instance Methods

_build_uri(endpoint, params) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 114
def _build_uri(endpoint, params)
  uri = URI("https://api.cloudflare.com/client/v4#{endpoint}")
  uri.query = URI.encode_www_form(params)
  uri
end
_from_neither(metadata, params) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 184
def _from_neither(metadata, params)
  @logger.info('last_timestamp or last_ray_id from previous run NOT set')
  params['start'] = metadata['default_start_time']
  params['end'] = params['start'] + @poll_interval
  metadata['first_ray_id'] = nil
  metadata['first_timestamp'] = params['start']
end
_from_ray_id(metadata, params) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 163
def _from_ray_id(metadata, params)
  # We have the previous ray ID so we use that and set the batch_size
  # in order to fetch a certain number of events
  @logger.info("Previous ray_id detected: #{metadata['last_ray_id']}")
  params['start_id'] = metadata['last_ray_id']
  params['count'] = @batch_size
  metadata['first_ray_id'] = metadata['last_ray_id']
  metadata['first_timestamp'] = nil
end
_from_timestamp(metadata, params) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 173
def _from_timestamp(metadata, params)
  # We have the last timestamp so we use it and use the poll_interval
  dt_tstamp = DateTime.strptime(metadata['last_timestamp'], '%s')
  @logger.info('last_timestamp from previous run detected: '\
               "#{metadata['last_timestamp']} #{dt_tstamp}")
  params['start'] = metadata['last_timestamp'].to_i
  params['end'] = params['start'] + @poll_interval
  metadata['first_ray_id'] = nil
  metadata['first_timestamp'] = params['start']
end
_process_response(response, uri, multi_line) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 120
def _process_response(response, uri, multi_line)
  content = response_body(response)
  if response.code != '200'
    raise CloudflareAPIError.new(uri.to_s, response, content),
          'Error calling Cloudflare API'
  end
  @logger.info("Received response from Cloudflare API (status_code: #{response.code})")
  lines = parse_content(content)
  return lines if multi_line
  lines[0]
end
_sleep_time() click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 256
def _sleep_time
  @logger.info("Waiting #{@poll_time} seconds before requesting data"\
               'from Cloudflare again')
  # We're staggering the poll_time so we don't block the worker for the whole 15s
  (@poll_time * 2).times do
    sleep(0.5)
  end
end
cloudflare_api_call(endpoint, params, multi_line = false) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 132
def cloudflare_api_call(endpoint, params, multi_line = false)
  uri = _build_uri(endpoint, params)
  @logger.info('Sending request to Cloudflare')
  Net::HTTP.start(uri.hostname, uri.port,
                  use_ssl: true,
                  open_timeout: @open_timeout,
                  read_timeout: @read_timeout) do |http|
    request = Net::HTTP::Get.new(
      uri.request_uri,
      'Accept-Encoding' => 'gzip',
      'X-Auth-Email' => @auth_email,
      'X-Auth-Key' => @auth_key
    )
    response = http.request(request)
    return _process_response(response, uri, multi_line)
  end
end
cloudflare_data(zone_id, metadata) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 207
def cloudflare_data(zone_id, metadata)
  @logger.info("cloudflare_data metadata: '#{metadata}'")
  params = cloudflare_params(metadata)
  @logger.info("Using params #{params}")
  begin
    entries = cloudflare_api_call("/zones/#{zone_id}/logs/requests",
                                  params, multi_line: true)
  rescue CloudflareAPIError => err
    err.errors.each do |error|
      @logger.error("Cloudflare error code: #{error['code']}: "\
                    "#{error['message']}")
    end
    entries = []
  rescue Timeout::Error
    @logger.error('Cloudflare API timed out. Consider adjusting the read_timeout.')
    entries = []
  end
  return entries unless entries.empty?
  @logger.info('No entries returned from Cloudflare')
  entries
end
cloudflare_params(metadata) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 192
def cloudflare_params(metadata)
  params = {}
  # if we have ray_id, we use that as a starting point
  if metadata['last_ray_id']
    _from_ray_id(metadata, params)
  elsif metadata['last_timestamp']
    _from_timestamp(metadata, params)
  else
    _from_neither(metadata, params)
  end
  metadata['last_timestamp'] = nil
  metadata['last_ray_id'] = nil
  params
end
cloudflare_zone_id(domain) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 150
def cloudflare_zone_id(domain)
  params = { status: 'active', name: domain }
  begin
    response = cloudflare_api_call('/zones', params)
  rescue Timeout::Error
    raise 'Cloudflare API timed out. Consider adjusting the read_timeout.'
  end
  response['result'].each do |zone|
    return zone['id'] if zone['name'] == domain
  end
  raise "No zone with domain #{domain} found"
end
continue_or_sleep(metadata) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 265
def continue_or_sleep(metadata)
  mod_tstamp = metadata['first_timestamp'].to_i + @poll_interval if metadata['first_timestamp']
  if !metadata['last_timestamp'] && metadata['first_timestamp'] && \
     mod_tstamp <= metadata['default_start_time']
    # we need to increment the timestamp by 2 minutes as we haven't
    # received any results in the last batch ... also make sure we
    # only do this if the end date is more than 10 minutes from the
    # current time
    @logger.info("Incrementing start timestamp by #{@poll_interval} seconds")
    metadata['last_timestamp'] = mod_tstamp
  elsif metadata['last_timestamp'] < metadata['default_start_time']
    # we won't need to sleep as we're trying to catch up
    return
  else
    _sleep_time
  end
end
fill_cloudflare_data(event, data) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 229
def fill_cloudflare_data(event, data)
  fields.each do |field|
    value = Hash[data]
    field.split('.').each do |field_part|
      value = value.fetch(field_part, {})
    end
    # in the case of cache.cacheStatus ... if cacheStatus doesn't exist, it would return {}
    # which results in ES erroring out, when passed in
    next if value == {}
    event[field.tr('.', '_')] = value
  end
end
loop_worker(queue, zone_id) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 283
def loop_worker(queue, zone_id)
  metadata = read_metadata
  entries = cloudflare_data(zone_id, metadata)
  @logger.info("Received #{entries.length} events")
  # if we only fetch one entry the odds are it's the one event that we asked for
  if entries.length <= 1
    @logger.info(
      'Need more than 1 event to process all entries (usually because the 1 event contains the '\
      'ray_id you asked for')
    _sleep_time
    return
  end
  entries.each do |entry|
    process_entry(queue, metadata, entry)
  end
  @logger.info(metadata)
  continue_or_sleep(metadata)
  write_metadata(metadata)
end
process_entry(queue, metadata, entry) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 242
def process_entry(queue, metadata, entry)
  # skip the first ray_id because we already processed it
  # in the last run
  return if metadata['first_ray_id'] && \
            entry['rayId'] == metadata['first_ray_id']
  event = LogStash::Event.new('host' => @host)
  fill_cloudflare_data(event, entry)
  decorate(event)
  queue << event
  metadata['last_ray_id'] = entry['rayId']
  # Cloudflare provides the timestamp in nanoseconds
  metadata['last_timestamp'] = entry['timestamp'] / 1_000_000_000
end
read_metadata() click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 85
def read_metadata
  # read the ray_id of the message which was parsed last
  metadata = {}
  if File.exist?(@metadata_filepath)
    content = File.read(@metadata_filepath).strip
    unless content.empty?
      begin
        metadata = JSON.parse(content)
      rescue JSON::ParserError
        metadata = {}
      end
    end
  end
  # make sure metadata contains all the keys we need
  %w(first_ray_id last_ray_id first_timestamp
     last_timestamp).each do |field|
    metadata[field] = nil unless metadata.key?(field)
  end
  metadata['default_start_time'] = \
    Time.now.getutc.to_i - @start_from_secs_ago
  metadata
end
register() click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 81
def register
  @host = Socket.gethostname
end
run(queue) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 303
def run(queue)
  @logger.info('Starting cloudflare run')
  zone_id = cloudflare_zone_id(@domain)
  @logger.info("Resolved zone ID #{zone_id} for domain #{@domain}")
  until stop?
    begin
      loop_worker(queue, zone_id)
    rescue => exception
      break if stop?
      @logger.error(exception.class)
      @logger.error(exception.message)
      @logger.error(exception.backtrace.join("\n"))
      raise(exception)
    end
  end # until loop
end
write_metadata(metadata) click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 108
def write_metadata(metadata)
  File.open(@metadata_filepath, 'w') do |file|
    file.write(JSON.generate(metadata))
  end
end