class Fluent::AerospikeOutput
Attributes
Public Class Methods
# File lib/fluent/plugin/out_aerospike.rb, line 20 def initialize super require 'aerospike' require 'msgpack' require 'uuidtools' end
Public Instance Methods
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
# File lib/fluent/plugin/out_aerospike.rb, line 30 def configure(conf) super # You can also refer raw parameter via conf[name]. write_policy = Aerospike::WritePolicy.new( Aerospike::RecordExistsAction::CREATE_ONLY, nil, nil, @ttl, nil ) end
This method is called when an event reaches to Fluentd. Convert the event to a raw string. [tag, time, record].to_json + “n” Alternatively, use msgpack to serialize the object. [tag, time, record].to_msgpack
# File lib/fluent/plugin/out_aerospike.rb, line 61 def format(tag, time, record) [tag, time, record].to_msgpack end
# File lib/fluent/plugin/out_aerospike.rb, line 83 def get_client(address) host_port = address.split(':', 2) return Aerospike::Client.new(host_port[0], host_port[1]) end
This method is called when shutting down. Shutdown the thread and close sockets or files here.
# File lib/fluent/plugin/out_aerospike.rb, line 51 def shutdown @client.close super end
This method is called when starting. Open sockets or files here.
# File lib/fluent/plugin/out_aerospike.rb, line 44 def start super @client = get_client(@address) end
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins. Optionally, you can use chunk.msgpack_each to deserialize objects.
# File lib/fluent/plugin/out_aerospike.rb, line 73 def write(chunk) chunk.msgpack_each {|(tag,time,record)| # key_s = "#{time_key.nil? time : record[time_key]}-#{UUIDTools::UUID.random_create}" key_s = "#{record[@time_key] || time}-#{UUIDTools::UUID.random_create}" set_s = @set || tag key = Aerospike::Key.new(@namespace, set_s, key_s) @client.put(key, record, write_policy) } end