class Fluent::Plugin::AmazonSNSOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amazon_sns.rb, line 32 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super @topic_generator = case when @topic_name ->(tag, record){ @topic_name } when @topic_map_key ->(tag, record){ record[@topic_map_key] } when @topic_map_tag ->(tag, record){ tag.gsub(/^#{@remove_tag_prefix}(\.)?/, '') } else raise Fluent::ConfigError, "no one way specified to decide target" end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_amazon_sns.rb, line 66 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_amazon_sns.rb, line 71 def formatted_to_msgpack_binary? true end
get_topics()
click to toggle source
# File lib/fluent/plugin/out_amazon_sns.rb, line 93 def get_topics @sns.topics.inject({}) do |product, topic| product[topic.arn.split(/:/).last] = topic product end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_amazon_sns.rb, line 75 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amazon_sns.rb, line 62 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amazon_sns.rb, line 48 def start super options = {} [:access_key_id, :secret_access_key, :region].each do |key| options[key] = instance_variable_get "@aws_#{key}" end options[:http_proxy] = @aws_proxy_uri sns_client = Aws::SNS::Client.new(options) @sns = Aws::SNS::Resource.new(client: sns_client) @topics = get_topics end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_amazon_sns.rb, line 79 def write(chunk) chunk.msgpack_each do |tag, time, record| record["time"] = Time.at(time).localtime subject = record.delete(@subject_key) || @subject || 'Fluent-Notification' topic = @topic_generator.call(tag, record) topic = topic.gsub(/\./, '-') if topic # SNS doesn't allow . if @topics[topic] @topics[topic].publish(message: record.to_json, subject: subject) else $log.error "Could not find topic '#{topic}' on SNS" end end end