class Fluent::RawTcpOutput
Constants
- RawNodeConfig
Attributes
nodes[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 6 def initialize super require 'socket' require 'timeout' require 'fileutils' @nodes = [] #=> [Node] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 18 def configure(conf) super conf.elements.each do |e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT name = e['name'] unless name name = "#{host}:#{port}" end node_conf = RawNodeConfig.new(name, host, port) @nodes << Node.new(log, node_conf) log.info "adding forwarding server '#{name}'", :host=>host, :port=>port end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 47 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 43 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 39 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 51 def write(chunk) return if chunk.empty? error = nil @nodes.each do |node| begin send_data(node, chunk) return rescue error = $! end end raise error if error raise "No nodes available" end
Private Instance Methods
connect(node)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 88 def connect(node) Timeout.timeout(@connect_timeout) do return TCPSocket.new(node.resolved_host, node.port) end end
send_data(node, chunk)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 70 def send_data(node, chunk) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) chunk.msgpack_each do |tag, time, record| next unless record.is_a? Hash sock.write([tag, time, record].to_msgpack) end ensure sock.close end end