class Fluent::AerospikeOutput

Attributes

write_policy[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aerospike.rb, line 20
def initialize
  super
  require 'aerospike'
  require 'msgpack'
  require 'uuidtools'
end

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_aerospike.rb, line 30
def configure(conf)
  super
  # You can also refer raw parameter via conf[name].
  write_policy = Aerospike::WritePolicy.new(
    Aerospike::RecordExistsAction::CREATE_ONLY,
    nil,
    nil,
    @ttl,
    nil
  )
end
format(tag, time, record) click to toggle source

This method is called when an event reaches to Fluentd. Convert the event to a raw string. [tag, time, record].to_json + “n” Alternatively, use msgpack to serialize the object. [tag, time, record].to_msgpack

# File lib/fluent/plugin/out_aerospike.rb, line 61
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
get_client(address) click to toggle source
# File lib/fluent/plugin/out_aerospike.rb, line 83
def get_client(address)
  host_port = address.split(':', 2)
  return Aerospike::Client.new(host_port[0], host_port[1])
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_aerospike.rb, line 51
def shutdown
  @client.close
  super
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_aerospike.rb, line 44
def start
  super
  @client = get_client(@address)
end
write(chunk) click to toggle source

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins. Optionally, you can use chunk.msgpack_each to deserialize objects.

# File lib/fluent/plugin/out_aerospike.rb, line 73
    def write(chunk)
      chunk.msgpack_each {|(tag,time,record)|
#         key_s = "#{time_key.nil? time : record[time_key]}-#{UUIDTools::UUID.random_create}"
        key_s = "#{record[@time_key] || time}-#{UUIDTools::UUID.random_create}"
        set_s = @set || tag
        key = Aerospike::Key.new(@namespace, set_s, key_s)
        @client.put(key, record, write_policy)
      }
    end