class Fluent::RawTcpOutput

Constants

RawNodeConfig

Attributes

nodes[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 6
def initialize
  super
  require 'socket'
  require 'timeout'
  require 'fileutils'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 18
def configure(conf)
  super

  conf.elements.each do |e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    node_conf = RawNodeConfig.new(name, host, port)
    @nodes << Node.new(log, node_conf)
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 47
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 43
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 39
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 51
def write(chunk)
  return if chunk.empty?

  error = nil

  @nodes.each do |node|
    begin
      send_data(node, chunk)
      return
    rescue
      error = $!
    end
  end

  raise error if error
  raise "No nodes available"
end

Private Instance Methods

connect(node) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 88
def connect(node)
  Timeout.timeout(@connect_timeout) do
    return TCPSocket.new(node.resolved_host, node.port)
  end
end
send_data(node, chunk) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 70
def send_data(node, chunk)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

    chunk.msgpack_each do |tag, time, record|
      next unless record.is_a? Hash
      sock.write([tag, time, record].to_msgpack)
    end
  ensure
    sock.close
  end
end