class Fluent::UnomalyOutput
Public Instance Methods
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
# File lib/fluent/plugin/out_unomaly.rb, line 38 def configure(conf) super conf["buffer_chunk_limit"] ||= "500k" conf["flush_interval"] ||= "1s" end
# File lib/fluent/plugin/out_unomaly.rb, line 125 def flatten(data, prefix) ret = {} if data.is_a? Hash data.each { |key, value| if prefix.to_s.empty? ret.merge! flatten(value, "#{key.to_s}") else ret.merge! flatten(value, "#{prefix}.#{key.to_s}") end } elsif data.is_a? Array data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")} else return {prefix => data.to_s} end ret end
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
# File lib/fluent/plugin/out_unomaly.rb, line 58 def format(tag, time, record) [tag, time, record].to_msgpack end
# File lib/fluent/plugin/out_unomaly.rb, line 98 def send_batch(events) url = @host + @api_path body = events.to_json ssl = url.start_with?('https') uri = URI.parse(url) header = {'Content-Type' => 'application/json'} http = Net::HTTP.new(uri.host, uri.port) if ssl http.use_ssl = true if @accept_self_signed_certs http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end log.info "Sending #{events.length} events to unomaly at #{@host}#{@api_path} (batch_size=#{@batch_size})" request = Net::HTTP::Post.new(uri.request_uri, header) request.body = body resp = http.request(request) if !resp.kind_of? Net::HTTPSuccess log.error "Error sending batch #{resp.to_s}" end end
This method is called when shutting down. Shutdown the thread and close sockets or files here.
# File lib/fluent/plugin/out_unomaly.rb, line 52 def shutdown super end
This method is called when starting. Open sockets or files here.
# File lib/fluent/plugin/out_unomaly.rb, line 46 def start super end
This method is called every flush interval. Write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| … }' to get IO objects.
NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
# File lib/fluent/plugin/out_unomaly.rb, line 69 def write(chunk) documents = [] chunk.msgpack_each do |(tag, time, record)| unomaly_event = { message: record[@message_key].to_s, source: record[@source_key].to_s, timestamp: Time.at(time).utc.to_datetime.rfc3339 } metadata = record.to_hash metadata.delete(@source_key) metadata.delete(@message_key) metadata["tag"]=tag unomaly_event["metadata"]=metadata if @debug log.info "Event #{unomaly_event.to_json}" end documents.push(unomaly_event) if documents.length >= @batch_size send_batch(documents) documents = [] end end send_batch(documents) end