class LogStash::Inputs::Cloudflare
Public Instance Methods
_build_uri(endpoint, params)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 114 def _build_uri(endpoint, params) uri = URI("https://api.cloudflare.com/client/v4#{endpoint}") uri.query = URI.encode_www_form(params) uri end
_from_neither(metadata, params)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 184 def _from_neither(metadata, params) @logger.info('last_timestamp or last_ray_id from previous run NOT set') params['start'] = metadata['default_start_time'] params['end'] = params['start'] + @poll_interval metadata['first_ray_id'] = nil metadata['first_timestamp'] = params['start'] end
_from_ray_id(metadata, params)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 163 def _from_ray_id(metadata, params) # We have the previous ray ID so we use that and set the batch_size # in order to fetch a certain number of events @logger.info("Previous ray_id detected: #{metadata['last_ray_id']}") params['start_id'] = metadata['last_ray_id'] params['count'] = @batch_size metadata['first_ray_id'] = metadata['last_ray_id'] metadata['first_timestamp'] = nil end
_from_timestamp(metadata, params)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 173 def _from_timestamp(metadata, params) # We have the last timestamp so we use it and use the poll_interval dt_tstamp = DateTime.strptime(metadata['last_timestamp'], '%s') @logger.info('last_timestamp from previous run detected: '\ "#{metadata['last_timestamp']} #{dt_tstamp}") params['start'] = metadata['last_timestamp'].to_i params['end'] = params['start'] + @poll_interval metadata['first_ray_id'] = nil metadata['first_timestamp'] = params['start'] end
_process_response(response, uri, multi_line)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 120 def _process_response(response, uri, multi_line) content = response_body(response) if response.code != '200' raise CloudflareAPIError.new(uri.to_s, response, content), 'Error calling Cloudflare API' end @logger.info("Received response from Cloudflare API (status_code: #{response.code})") lines = parse_content(content) return lines if multi_line lines[0] end
_sleep_time()
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 256 def _sleep_time @logger.info("Waiting #{@poll_time} seconds before requesting data"\ 'from Cloudflare again') # We're staggering the poll_time so we don't block the worker for the whole 15s (@poll_time * 2).times do sleep(0.5) end end
cloudflare_api_call(endpoint, params, multi_line = false)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 132 def cloudflare_api_call(endpoint, params, multi_line = false) uri = _build_uri(endpoint, params) @logger.info('Sending request to Cloudflare') Net::HTTP.start(uri.hostname, uri.port, use_ssl: true, open_timeout: @open_timeout, read_timeout: @read_timeout) do |http| request = Net::HTTP::Get.new( uri.request_uri, 'Accept-Encoding' => 'gzip', 'X-Auth-Email' => @auth_email, 'X-Auth-Key' => @auth_key ) response = http.request(request) return _process_response(response, uri, multi_line) end end
cloudflare_data(zone_id, metadata)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 207 def cloudflare_data(zone_id, metadata) @logger.info("cloudflare_data metadata: '#{metadata}'") params = cloudflare_params(metadata) @logger.info("Using params #{params}") begin entries = cloudflare_api_call("/zones/#{zone_id}/logs/requests", params, multi_line: true) rescue CloudflareAPIError => err err.errors.each do |error| @logger.error("Cloudflare error code: #{error['code']}: "\ "#{error['message']}") end entries = [] rescue Timeout::Error @logger.error('Cloudflare API timed out. Consider adjusting the read_timeout.') entries = [] end return entries unless entries.empty? @logger.info('No entries returned from Cloudflare') entries end
cloudflare_params(metadata)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 192 def cloudflare_params(metadata) params = {} # if we have ray_id, we use that as a starting point if metadata['last_ray_id'] _from_ray_id(metadata, params) elsif metadata['last_timestamp'] _from_timestamp(metadata, params) else _from_neither(metadata, params) end metadata['last_timestamp'] = nil metadata['last_ray_id'] = nil params end
cloudflare_zone_id(domain)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 150 def cloudflare_zone_id(domain) params = { status: 'active', name: domain } begin response = cloudflare_api_call('/zones', params) rescue Timeout::Error raise 'Cloudflare API timed out. Consider adjusting the read_timeout.' end response['result'].each do |zone| return zone['id'] if zone['name'] == domain end raise "No zone with domain #{domain} found" end
continue_or_sleep(metadata)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 265 def continue_or_sleep(metadata) mod_tstamp = metadata['first_timestamp'].to_i + @poll_interval if metadata['first_timestamp'] if !metadata['last_timestamp'] && metadata['first_timestamp'] && \ mod_tstamp <= metadata['default_start_time'] # we need to increment the timestamp by 2 minutes as we haven't # received any results in the last batch ... also make sure we # only do this if the end date is more than 10 minutes from the # current time @logger.info("Incrementing start timestamp by #{@poll_interval} seconds") metadata['last_timestamp'] = mod_tstamp elsif metadata['last_timestamp'] < metadata['default_start_time'] # we won't need to sleep as we're trying to catch up return else _sleep_time end end
fill_cloudflare_data(event, data)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 229 def fill_cloudflare_data(event, data) fields.each do |field| value = Hash[data] field.split('.').each do |field_part| value = value.fetch(field_part, {}) end # in the case of cache.cacheStatus ... if cacheStatus doesn't exist, it would return {} # which results in ES erroring out, when passed in next if value == {} event[field.tr('.', '_')] = value end end
loop_worker(queue, zone_id)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 283 def loop_worker(queue, zone_id) metadata = read_metadata entries = cloudflare_data(zone_id, metadata) @logger.info("Received #{entries.length} events") # if we only fetch one entry the odds are it's the one event that we asked for if entries.length <= 1 @logger.info( 'Need more than 1 event to process all entries (usually because the 1 event contains the '\ 'ray_id you asked for') _sleep_time return end entries.each do |entry| process_entry(queue, metadata, entry) end @logger.info(metadata) continue_or_sleep(metadata) write_metadata(metadata) end
process_entry(queue, metadata, entry)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 242 def process_entry(queue, metadata, entry) # skip the first ray_id because we already processed it # in the last run return if metadata['first_ray_id'] && \ entry['rayId'] == metadata['first_ray_id'] event = LogStash::Event.new('host' => @host) fill_cloudflare_data(event, entry) decorate(event) queue << event metadata['last_ray_id'] = entry['rayId'] # Cloudflare provides the timestamp in nanoseconds metadata['last_timestamp'] = entry['timestamp'] / 1_000_000_000 end
read_metadata()
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 85 def read_metadata # read the ray_id of the message which was parsed last metadata = {} if File.exist?(@metadata_filepath) content = File.read(@metadata_filepath).strip unless content.empty? begin metadata = JSON.parse(content) rescue JSON::ParserError metadata = {} end end end # make sure metadata contains all the keys we need %w(first_ray_id last_ray_id first_timestamp last_timestamp).each do |field| metadata[field] = nil unless metadata.key?(field) end metadata['default_start_time'] = \ Time.now.getutc.to_i - @start_from_secs_ago metadata end
register()
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 81 def register @host = Socket.gethostname end
run(queue)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 303 def run(queue) @logger.info('Starting cloudflare run') zone_id = cloudflare_zone_id(@domain) @logger.info("Resolved zone ID #{zone_id} for domain #{@domain}") until stop? begin loop_worker(queue, zone_id) rescue => exception break if stop? @logger.error(exception.class) @logger.error(exception.message) @logger.error(exception.backtrace.join("\n")) raise(exception) end end # until loop end
write_metadata(metadata)
click to toggle source
# File lib/logstash/inputs/cloudflare.rb, line 108 def write_metadata(metadata) File.open(@metadata_filepath, 'w') do |file| file.write(JSON.generate(metadata)) end end