class Fluent::CloudwatchInput
Attributes
dimensions[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 40 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 44 def configure(conf) super @dimensions = [] if @dimensions_name && @dimensions_value names = @dimensions_name.split(",").each values = @dimensions_value.split(",").each loop do @dimensions.push({ :name => names.next, :value => values.next, }) end elsif @dimensions_name || @dimensions_value @dimensions.push({ :name => @dimensions_name, :value => @dimensions_value, }) end endpoint = URI(@cw_endpoint) if endpoint.scheme != "http" && endpoint.scheme != "https" @cw_endpoint_uri = "https://#{@cw_endpoint}" else @cw_endpoint_uri = endpoint.to_s end if !@region @region = @cw_endpoint.split('.')[1] end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 86 def shutdown super @running = false @watcher.terminate @monitor.terminate @watcher.join @monitor.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 76 def start super @running = true @updated = Time.now @watcher = Thread.new(&method(:watch)) @monitor = Thread.new(&method(:monitor)) @mutex = Mutex.new end
Private Instance Methods
monitor()
click to toggle source
if watcher thread was not update timestamp in recent @interval * 2 sec., restarting it.
# File lib/fluent/plugin/in_cloudwatch.rb, line 98 def monitor log.debug "cloudwatch: monitor thread starting" while @running sleep @interval / 2 @mutex.synchronize do log.debug "cloudwatch: last updated at #{@updated}" now = Time.now if @updated < now - @interval * 2 log.warn "cloudwatch: watcher thread is not working after #{@updated}. Restarting..." @watcher.kill @updated = now @watcher = Thread.new(&method(:watch)) end end end end
output()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch.rb, line 147 def output @metric_name.split(",").each {|m| name, s = m.split(":") s ||= @statistics now = Time.now - @offset log.debug("now #{now}") statistics = @cw.get_metric_statistics({ :namespace => @namespace, :metric_name => name, :statistics => [s], :dimensions => @dimensions, :start_time => (now - @period*10).iso8601, :end_time => now.iso8601, :period => @period, }) if not statistics[:datapoints].empty? datapoint = statistics[:datapoints].sort_by{|h| h[:timestamp]}.last data = datapoint[s.downcase.to_sym] # unix time catch_time = datapoint[:timestamp].to_i router.emit(tag, catch_time, { name => data }.merge(@record_attr)) elsif @emit_zero router.emit(tag, now.to_i, { name => 0 }.merge(@record_attr)) else log.warn "cloudwatch: #{@namespace} #{@dimensions_name} #{@dimensions_value} #{name} #{s} datapoints is empty" end } end
watch()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch.rb, line 115 def watch log.debug "cloudwatch: watch thread starting" if @delayed_start delay = rand() * @interval log.debug "cloudwatch: delay at start #{delay} sec" sleep delay end @cw = Aws::CloudWatch::Client.new( :region => @region, :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key, :endpoint => @cw_endpoint_uri, :http_open_timeout => @open_timeout, :http_read_timeout => @read_timeout, ) output started = Time.now while @running now = Time.now sleep 1 if now - started >= @interval output started = now @mutex.synchronize do @updated = Time.now end end end end