class Fluent::Riak2Output
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_riak2.rb, line 14 def initialize super require 'riak' require 'msgpack' require 'uuidtools' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_riak2.rb, line 21 def configure(conf) super @nodes = @nodes.split(',').map do |s| ip,port = s.split(':') { :host => ip, :pb_port => port.to_i } end $log.info "riak nodes=#{@nodes}" end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_riak2.rb, line 65 def format(tag, time, record) [time, tag, record].to_msgpack end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_riak2.rb, line 30 def start $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} " @client = Riak::Client.new(:nodes => @nodes) @bucket = @client.bucket(@bucket_name) @buf = {} # $log.debug "riak2_metadata_bucket_type => #{@riak2_metadata_bucket_type}" # $log.debug "bucket_type => #{@bucket_type}" unless @riak2_metadata_bucket_type.empty? # Here we are storing our bucket type and bucket name in a metadata map. This allows clients to query that map to see a list of all fluentd buckets. # bucket_type/name/key is returns a metadata map # config defined bucket type metadata_bucket_type = @riak2_metadata_bucket_type # bucket name metadata_bucket_name = "fluent-plugin-riak2-metadata" # root level key for our metadata map metadata_key = "fluent-plugin-riak2-metadata-key" # our metadata map has a kv where: # 1. key is set_of_logfile_buckets_key # 2. value is a set of strings. #each string represents the bucket type and name for a single logfile set_of_logfile_buckets_key = "all_buckets" # inner key for our set of all logfile bucket type/name mdbucket = @client.bucket(metadata_bucket_name) Riak::Crdt::DEFAULT_BUCKET_TYPES[:map] = metadata_bucket_type map = Riak::Crdt::Map.new(mdbucket, metadata_key) map.sets[set_of_logfile_buckets_key].add "#{@bucket_type} #{@bucket_name}" end super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_riak2.rb, line 69 def write(chunk) $log.debug " <<<<<===========\n" records = [] chunk.msgpack_each do |time, tag, record| record[@tag_key] = tag records << record $log.debug record end put_now(records) end
Private Instance Methods
put_now(records)
click to toggle source
# File lib/fluent/plugin/out_riak2.rb, line 82 def put_now(records) unless records.empty? threads = [] records.each do |record| # if you put log statements here, # you must take care to NOT forward fluentd's logs to riak. # you will trigger a recursive avalance of riak storage activity. now = DateTime.now.strftime("%Y%m%d%H%M%S") key = "#{now}-#{UUIDTools::UUID.timestamp_create.to_s}" # $log.debug "#{@bucket_name} #{key} \n" threads << Thread.new { begin robj = Riak::RObject.new(@bucket, key) robj.content_type = "application/json" robj.raw_data = record.to_json robj.store(type: @bucket_type) rescue => e $log.error "ERROR #{e}" end } end end end