class Fluent::Plugin::CloudwatchLogsInput
Constants
- DEFAULT_STORAGE_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 65 def initialize super @parser = nil require 'aws-sdk-cloudwatchlogs' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 72 def configure(conf) compat_parameters_convert(conf, :parser) super configure_parser(conf) @start_time = (Time.strptime(@start_time, @time_range_format).to_f * 1000).floor if @start_time @end_time = (Time.strptime(@end_time, @time_range_format).to_f * 1000).floor if @end_time if @start_time && @end_time && (@end_time < @start_time) raise Fluent::ConfigError, "end_time(#{@end_time}) should be greater than start_time(#{@start_time})." end @next_token_storage = storage_create(usage: 'store_next_tokens', conf: config, default_type: DEFAULT_STORAGE_TYPE) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 142 def shutdown @finished = true super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 85 def start super options = {} options[:region] = @region if @region options[:endpoint] = @endpoint if @endpoint options[:ssl_verify_peer] = @ssl_verify_peer options[:http_proxy] = @http_proxy if @http_proxy if @aws_use_sts Aws.config[:region] = options[:region] credentials_options = { role_arn: @aws_sts_role_arn, role_session_name: @aws_sts_session_name, external_id: @aws_sts_external_id, policy: @aws_sts_policy, duration_seconds: @aws_sts_duration_seconds } credentials_options[:sts_endpoint_url] = @aws_sts_endpoint_url if @aws_sts_endpoint_url if @region and @aws_sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(:region => @region, endpoint: @aws_sts_endpoint_url) elsif @region credentials_options[:client] = Aws::STS::Client.new(:region => @region) end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) elsif @web_identity_credentials c = @web_identity_credentials credentials_options = {} credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:web_identity_token_file] = c.web_identity_token_file credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds if @region credentials_options[:client] = Aws::STS::Client.new(:region => @region) end options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options) elsif @aws_ecs_authentication # collect AWS credential from ECS relative uri ENV variable aws_container_credentials_relative_uri = ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new({credential_path: aws_container_credentials_relative_uri}).credentials else options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key end @logs = Aws::CloudWatchLogs::Client.new(options) @finished = false thread_create(:in_cloudwatch_logs_runner, &method(:run)) @json_handler = case @json_handler when :yajl Yajl when :json JSON end end
state_key_for(log_stream_name, log_group_name = nil)
click to toggle source
No private for testing
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 148 def state_key_for(log_stream_name, log_group_name = nil) if log_group_name && log_stream_name "#{@state_file}_#{log_group_name.gsub(File::SEPARATOR, '-')}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" elsif log_stream_name "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" else @state_file end end
Private Instance Methods
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 159 def configure_parser(conf) if conf['format'] @parser = parser_create elsif parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config) end end
describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 339 def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil) request = { log_group_name_prefix: log_group_name_prefix } request[:next_token] = next_token if next_token response = @logs.describe_log_groups(request) if log_groups log_groups.concat(response.log_groups) else log_groups = response.log_groups end if response.next_token log_groups = describe_log_groups(log_group_name_prefix, log_groups, response.next_token) end log_groups end
describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 306 def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil) throttling_handler('describe_log_streams') do request = { log_group_name: log_group_name != nil ? log_group_name : @log_group_name } request[:next_token] = next_token if next_token request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix response = @logs.describe_log_streams(request) if log_streams log_streams.concat(response.log_streams) else log_streams = response.log_streams end if response.next_token log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name) end log_streams end end
emit(group, stream, event, metadata)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 242 def emit(group, stream, event, metadata) if @parser @parser.parse(event.message) {|time,record| if @use_aws_timestamp time = (event.timestamp / 1000).floor end if @add_log_group_name record[@log_group_name_key] = group end unless metadata.empty? record.merge!("metadata" => metadata) end router.emit(@tag, time, record) } else time = (event.timestamp / 1000).floor begin record = @json_handler.load(event.message) if @add_log_group_name record[@log_group_name_key] = group end unless metadata.empty? record.merge!("metadata" => metadata) end router.emit(@tag, time, record) rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors log.error "Invalid JSON encountered while parsing event.message" router.emit_error_event(@tag, time, { message: event.message }, error) end end end
get_events(log_group_name, log_stream_name)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 274 def get_events(log_group_name, log_stream_name) throttling_handler('get_log_events') do request = { log_group_name: log_group_name, log_stream_name: log_stream_name } request.merge!(start_time: @start_time) if @start_time request.merge!(end_time: @end_time) if @end_time if @use_log_group_name_prefix log_next_token = next_token(log_stream_name, log_group_name) else log_next_token = next_token(log_stream_name) end request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? request[:start_from_head] = true if read_from_head?(log_next_token) response = @logs.get_log_events(request) if valid_next_token(log_next_token, response.next_forward_token) if @use_log_group_name_prefix store_next_token(response.next_forward_token, log_stream_name, log_group_name) else store_next_token(response.next_forward_token, log_stream_name) end end response.events end end
get_todays_date()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 360 def get_todays_date Date.today.strftime("%Y/%m/%d") end
get_yesterdays_date()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 364 def get_yesterdays_date (Date.today - 1).strftime("%Y/%m/%d") end
migrate_state_file_to_storage(log_stream_name)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 167 def migrate_state_file_to_storage(log_stream_name) @next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp) File.delete(state_key_for(log_stream_name)) end
next_token(log_stream_name, log_group_name = nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 172 def next_token(log_stream_name, log_group_name = nil) if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name)) migrate_state_file_to_storage(log_stream_name) end @next_token_storage.get(:"#{state_key_for(log_stream_name, log_group_name)}") end
read_from_head?(next_token)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 302 def read_from_head?(next_token) (!next_token.nil? && !next_token.empty?) || @start_time || @end_time end
run()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 183 def run @next_fetch_time = Time.now until @finished if Time.now > @next_fetch_time @next_fetch_time += @fetch_interval if @use_log_group_name_prefix log_group_names = describe_log_groups(@log_group_name).map{|log_group| log_group.log_group_name } else log_group_names = [@log_group_name] end log_group_names.each do |log_group_name| if @use_log_stream_name_prefix || @use_todays_log_stream log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name begin log_streams = describe_log_streams(log_stream_name_prefix, nil, nil, log_group_name) log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream log_streams.each do |log_stream| log_stream_name = log_stream.log_stream_name events = get_events(log_group_name, log_stream_name) metadata = if @include_metadata { "log_stream_name" => log_stream_name, "log_group_name" => log_group_name } else {} end events.each do |event| emit(log_group_name, log_stream_name, event, metadata) end end rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found" next end else events = get_events(log_group_name, @log_stream_name) metadata = if @include_metadata { "log_stream_name" => @log_stream_name, "log_group_name" => @log_group_name } else {} end events.each do |event| emit(log_group_name, log_stream_name, event, metadata) end end end end sleep 1 end end
store_next_token(token, log_stream_name = nil, log_group_name = nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 179 def store_next_token(token, log_stream_name = nil, log_group_name = nil) @next_token_storage.put(:"#{state_key_for(log_stream_name, log_group_name)}", token) end
throttling_handler(method_name) { || ... }
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 326 def throttling_handler(method_name) yield rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err if throttling_retry_seconds log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry." sleep throttling_retry_seconds throttling_handler(method_name) { yield } else raise err end end
valid_next_token(prev_token, next_token)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 356 def valid_next_token(prev_token, next_token) next_token && prev_token != next_token.chomp end