class Fluent::Plugin::NatsStreamingOutput
Constants
- DEFAULT_FORMAT_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 53 def initialize super @sc = nil end
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 141 def close super @sc.close if @sc end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 58 def configure(conf) super @sc_config = { servers: ["nats://#{server}"], reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, connect_timeout: @connect_timeout } formatter_conf = conf.elements('format').first unless formatter_conf raise Fluent::ConfigError, "<format> section is required." end unless formatter_conf["@type"] raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 49 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 45 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 118 def process(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time,record| @sc.publish(tag, format(tag, time, record)) end end
run()
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 83 def run @sc = STAN::Client.new log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}" @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config) log.info "connected" while thread_current_running? log.trace "test connection" @sc.nats.flush(@reconnect_time_wait) sleep(5) end end
setup_formatter(conf)
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 97 def setup_formatter(conf) type = conf['@type'] case type when 'json' begin require 'oj' Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |tag, time, record| Oj.dump(record) } rescue LoadError require 'yajl' Proc.new { |tag, time, record| Yajl::Encoder.encode(record) } end when 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } else @formatter = formatter_create(usage: 'kafka-plugin', conf: conf) @formatter.method(:format) end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 78 def start super thread_create(:nats_streaming_output_main, &method(:run)) end
terminate()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 146 def terminate super @sc = nil end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 125 def write(chunk) return if chunk.empty? tag = chunk.metadata.tag messages = 0 chunk.each { |time, record| record_buf = @formatter_proc.call(tag, time, record) log.trace "Send record: #{record_buf}" @sc.publish(tag, record_buf, {timeout: @timeout} ) messages += 1 } if messages > 0 log.debug { "#{messages} messages send." } end end