class Fluent::UnomalyOutput

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_unomaly.rb, line 38
def configure(conf)
  super
  conf["buffer_chunk_limit"] ||= "500k"
  conf["flush_interval"] ||= "1s"
end
flatten(data, prefix) click to toggle source
# File lib/fluent/plugin/out_unomaly.rb, line 125
def flatten(data, prefix)
  ret = {}
  if data.is_a? Hash
    data.each { |key, value|
      if prefix.to_s.empty?
        ret.merge! flatten(value, "#{key.to_s}")
      else
        ret.merge! flatten(value, "#{prefix}.#{key.to_s}")
      end
    }
  elsif data.is_a? Array
    data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")}
  else
    return {prefix => data.to_s}
  end

  ret
end
format(tag, time, record) click to toggle source

This method is called when an event reaches to Fluentd. Convert the event to a raw string.

# File lib/fluent/plugin/out_unomaly.rb, line 58
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
send_batch(events) click to toggle source
# File lib/fluent/plugin/out_unomaly.rb, line 98
def send_batch(events)
  url = @host + @api_path
  body = events.to_json
  ssl = url.start_with?('https')
  uri = URI.parse(url)
  header = {'Content-Type' => 'application/json'}

  http = Net::HTTP.new(uri.host, uri.port)
  if ssl
    http.use_ssl = true
    if @accept_self_signed_certs
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end

  log.info "Sending #{events.length} events to unomaly at #{@host}#{@api_path} (batch_size=#{@batch_size})"

  request = Net::HTTP::Post.new(uri.request_uri, header)
  request.body = body

  resp = http.request(request)
  if !resp.kind_of? Net::HTTPSuccess
    log.error "Error sending batch #{resp.to_s}"
  end
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_unomaly.rb, line 52
def shutdown
  super
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_unomaly.rb, line 46
def start
  super
end
write(chunk) click to toggle source

This method is called every flush interval. Write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| … }' to get IO objects.

NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.

# File lib/fluent/plugin/out_unomaly.rb, line 69
def write(chunk)
  documents = []
  chunk.msgpack_each do |(tag, time, record)|
    unomaly_event = {
        message: record[@message_key].to_s,
        source: record[@source_key].to_s,
        timestamp: Time.at(time).utc.to_datetime.rfc3339
    }
    metadata = record.to_hash

    metadata.delete(@source_key)
    metadata.delete(@message_key)
    metadata["tag"]=tag

    unomaly_event["metadata"]=metadata

    if @debug
      log.info "Event #{unomaly_event.to_json}"
    end
    documents.push(unomaly_event)

    if documents.length >= @batch_size
      send_batch(documents)
      documents = []
    end
  end
  send_batch(documents)
end