class Fluent::CloudwatchInput

Attributes

dimensions[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 40
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 44
def configure(conf)
  super

  @dimensions = []
  if @dimensions_name && @dimensions_value
    names = @dimensions_name.split(",").each
    values = @dimensions_value.split(",").each
    loop do
      @dimensions.push({
        :name => names.next,
        :value => values.next,
      })
    end
  elsif @dimensions_name || @dimensions_value
    @dimensions.push({
      :name => @dimensions_name,
      :value => @dimensions_value,
    })
  end

  endpoint = URI(@cw_endpoint)
  if endpoint.scheme != "http" && endpoint.scheme != "https"
    @cw_endpoint_uri = "https://#{@cw_endpoint}"
  else
    @cw_endpoint_uri = endpoint.to_s
  end

  if !@region
    @region = @cw_endpoint.split('.')[1]
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 86
def shutdown
  super
  @running = false
  @watcher.terminate
  @monitor.terminate
  @watcher.join
  @monitor.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch.rb, line 76
def start
  super

  @running = true
  @updated = Time.now
  @watcher = Thread.new(&method(:watch))
  @monitor = Thread.new(&method(:monitor))
  @mutex   = Mutex.new
end

Private Instance Methods

monitor() click to toggle source

if watcher thread was not update timestamp in recent @interval * 2 sec., restarting it.

# File lib/fluent/plugin/in_cloudwatch.rb, line 98
def monitor
  log.debug "cloudwatch: monitor thread starting"
  while @running
    sleep @interval / 2
    @mutex.synchronize do
      log.debug "cloudwatch: last updated at #{@updated}"
      now = Time.now
      if @updated < now - @interval * 2
        log.warn "cloudwatch: watcher thread is not working after #{@updated}. Restarting..."
        @watcher.kill
        @updated = now
        @watcher = Thread.new(&method(:watch))
      end
    end
  end
end
output() click to toggle source
# File lib/fluent/plugin/in_cloudwatch.rb, line 147
def output
  @metric_name.split(",").each {|m|
    name, s = m.split(":")
    s ||= @statistics
    now = Time.now - @offset
    log.debug("now #{now}")
    statistics = @cw.get_metric_statistics({
      :namespace   => @namespace,
      :metric_name => name,
      :statistics  => [s],
      :dimensions  => @dimensions,
      :start_time  => (now - @period*10).iso8601,
      :end_time    => now.iso8601,
      :period      => @period,
    })
    if not statistics[:datapoints].empty?
      datapoint = statistics[:datapoints].sort_by{|h| h[:timestamp]}.last
      data = datapoint[s.downcase.to_sym]

      # unix time
      catch_time = datapoint[:timestamp].to_i
      router.emit(tag, catch_time, { name => data }.merge(@record_attr))
    elsif @emit_zero
      router.emit(tag, now.to_i, { name => 0 }.merge(@record_attr))
    else
      log.warn "cloudwatch: #{@namespace} #{@dimensions_name} #{@dimensions_value} #{name} #{s} datapoints is empty"
    end
  }
end
watch() click to toggle source
# File lib/fluent/plugin/in_cloudwatch.rb, line 115
def watch
  log.debug "cloudwatch: watch thread starting"
  if @delayed_start
    delay = rand() * @interval
    log.debug "cloudwatch: delay at start #{delay} sec"
    sleep delay
  end

  @cw = Aws::CloudWatch::Client.new(
    :region            => @region,
    :access_key_id     => @aws_key_id,
    :secret_access_key => @aws_sec_key,
    :endpoint          => @cw_endpoint_uri,
    :http_open_timeout => @open_timeout,
    :http_read_timeout => @read_timeout,
  )
  output

  started = Time.now
  while @running
    now = Time.now
    sleep 1
    if now - started >= @interval
      output
      started = now
      @mutex.synchronize do
        @updated = Time.now
      end
    end
  end
end