class Fluent::Plugin::OutMqtt

Constants

DEFAULT_BUFFER_TYPE
LIMIT_MQTT

Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 40
def initialize
  super

  @clients = {}
  @connection_options = {}
  @collection_options = {:capped => false}
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 48
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject, :formatter)
  super
  @bind ||= conf['bind']
  @topic ||= conf['topic']
  @port ||= conf['port']
  @formatter = formatter_create
  if conf.has_key?('buffer_chunk_limit')
    #check buffer_size
    conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf)
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 82
def format(tag, time, record)
  [time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 86
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 90
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 77
def shutdown
  @connect.disconnect
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 61
def start

  log.debug "start mqtt #{@bind}"
  opts = {host: @bind,
          port: @port}
  opts[:client_id] = @client_id if @client_id
  opts[:username] =  @username if @username
  opts[:password] = @password if @password
  opts[:ssl] = @ssl if @ssl
  opts[:ca_file] = @ca if @ca
  opts[:cert_file] = @cert if @cert
  opts[:key_file] = @key if @key
  @connect = MQTT::Client.connect(opts)
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 94
def write(chunk)
  tag = chunk.metadata.tag
  chunk.msgpack_each { |time, record|
    record = inject_values_to_record(tag, time, record)
    log.debug "write #{@topic} #{@formatter.format(tag,time,record)}"
    @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain)
  }
end

Private Instance Methods

available_buffer_chunk_limit(conf) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 107
def available_buffer_chunk_limit(conf)
  if conf['buffer_chunk_limit'] > LIMIT_MQTT
    log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}"
    LIMIT_MQTT
  else
    conf['buffer_chunk_limit']
  end
end