class Fluent::Plugin::OutMqtt
Constants
- DEFAULT_BUFFER_TYPE
- LIMIT_MQTT
Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 40 def initialize super @clients = {} @connection_options = {} @collection_options = {:capped => false} end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 48 def configure(conf) compat_parameters_convert(conf, :buffer, :inject, :formatter) super @bind ||= conf['bind'] @topic ||= conf['topic'] @port ||= conf['port'] @formatter = formatter_create if conf.has_key?('buffer_chunk_limit') #check buffer_size conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 82 def format(tag, time, record) [time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 86 def formatted_to_msgpack_binary true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 90 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 77 def shutdown @connect.disconnect super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 61 def start log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca if @ca opts[:cert_file] = @cert if @cert opts[:key_file] = @key if @key @connect = MQTT::Client.connect(opts) super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 94 def write(chunk) tag = chunk.metadata.tag chunk.msgpack_each { |time, record| record = inject_values_to_record(tag, time, record) log.debug "write #{@topic} #{@formatter.format(tag,time,record)}" @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain) } end
Private Instance Methods
available_buffer_chunk_limit(conf)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 107 def available_buffer_chunk_limit(conf) if conf['buffer_chunk_limit'] > LIMIT_MQTT log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{LIMIT_MQTT}" LIMIT_MQTT else conf['buffer_chunk_limit'] end end