class Fluent::SecureForwardOutput
Constants
- DEFAULT_SECURE_CONNECT_PORT
- FORWARD_HEADER
MessagePack FixArray length = 2
- HOSTNAME_PLACEHOLDERS
Attributes
hostname_resolver[R]
nodes[R]
read_interval[R]
socket_interval[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_secure_forward.rb, line 91 def configure(conf) hostname = conf.has_key?('hostname') ? conf['hostname'].to_s : Socket.gethostname replace_hostname_placeholder(conf, hostname) super if @secure if @ca_cert_path raise Fluent::ConfigError, "CA cert file not found nor readable at '#{@ca_cert_path}'" unless File.readable?(@ca_cert_path) begin OpenSSL::X509::Certificate.new File.read(@ca_cert_path) rescue OpenSSL::X509::CertificateError raise Fluent::ConfigError, "failed to load CA cert file" end else raise Fluent::ConfigError, "FQDN verification required for certificates issued from public CA" unless @enable_strict_verification log.info "secure connection with valid certificates issued from public CA" end else log.warn "'insecure' mode has vulnerability for man-in-the-middle attacks." end if @keepalive && !@connection_hard_timeout @connection_hard_timeout = @keepalive * 1.2 end @read_interval = @read_interval_msec / 1000.0 @socket_interval = @socket_interval_msec / 1000.0 @nodes = [] @servers.each do |server| node = Node.new(self, server) node.first_session = true @nodes.push node end if @num_threads > @nodes.select{|n| not n.standby}.size log.warn "Too many num_threads for secure-forward: threads should be smaller or equal to non standby servers" end @next_node = 0 @mutex = Mutex.new @hostname_resolver = Resolve::Hostname.new(system_resolver: true, ttl: @expire_dns_cache) true end
node_watcher()
click to toggle source
# File lib/fluent/plugin/out_secure_forward.rb, line 174 def node_watcher reconnectings = Array.new(@nodes.size) nodes_size = @nodes.size loop do sleep @reconnect_interval log.trace "in node health watcher" (0...nodes_size).each do |i| log.trace "node health watcher for #{@nodes[i].host}" next if @nodes[i].established? && ! @nodes[i].expired? && ! @nodes[i].detached? next if reconnectings[i] reason = :expired unless @nodes[i].established? log.warn "dead connection found: #{@nodes[i].host}, reconnecting..." reason = :dead end node = @nodes[i] log.debug "reconnecting to node", host: node.host, port: node.port, state: node.state, expire: node.expire, expired: node.expired?, detached: node.detached? renewed = node.dup renewed.start Thread.pass # to connection thread reconnectings[i] = { conn: renewed, at: Time.now, reason: reason } end (0...nodes_size).each do |i| next unless reconnectings[i] log.trace "checking reconnecting node #{reconnectings[i][:conn].host}" if reconnectings[i][:conn].established? log.debug "connection established for reconnecting node" oldconn = @nodes[i] @nodes[i] = reconnectings[i][:conn] if reconnectings[i][:reason] == :dead log.warn "recovered connection to dead node: #{nodes[i].host}" end log.trace "old connection shutting down" oldconn.detach! if oldconn # connection object doesn't raise any exceptions log.trace "old connection shutted down" reconnectings[i] = nil next end # not connected yet next if reconnectings[i][:at] + @established_timeout > Time.now # not connected yet, and timeout timeout_conn = reconnectings[i][:conn] log.debug "SSL connection is not established until timemout", host: timeout_conn.host, port: timeout_conn.port, timeout: @established_timeout reconnectings[i] = nil timeout_conn.detach! if timeout_conn # connection object doesn't raise any exceptions end end end
replace_hostname_placeholder(conf, hostname)
click to toggle source
# File lib/fluent/plugin/out_secure_forward.rb, line 76 def replace_hostname_placeholder(conf, hostname) replace_element = ->(c) { c.keys.each do |key| v = c[key] if v && v.respond_to?(:include?) && v.respond_to?(:gsub) if HOSTNAME_PLACEHOLDERS.any?{|ph| v.include?(ph) } c[key] = HOSTNAME_PLACEHOLDERS.inject(v){|r, ph| r.gsub(ph, hostname) } end end end c.elements.each{|e| replace_element.call(e) } } replace_element.call(conf) end
select_node(permit_standby=false)
click to toggle source
# File lib/fluent/plugin/out_secure_forward.rb, line 139 def select_node(permit_standby=false) tries = 0 nodes = @nodes.size @mutex.synchronize { n = nil while tries <= nodes n = @nodes[@next_node] @next_node += 1 @next_node = 0 if @next_node >= nodes if n && n.established? && (! n.tained?) && (! n.detached?) && (!n.standby || permit_standby) n.tain! return n end tries += 1 end nil } end
send_data(node, tag, es)
click to toggle source
to forward messages
# File lib/fluent/plugin/out_secure_forward.rb, line 278 def send_data(node, tag, es) ssl = node.sslsession # beginArray(2) ssl.write FORWARD_HEADER # writeRaw(tag) ssl.write tag.to_msgpack # beginRaw(size) sz = es.size # # FixRaw # ssl.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # ssl.write [0xda, sz].pack('Cn') #else # raw 32 ssl.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) es.write_to(ssl) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_secure_forward.rb, line 243 def shutdown super @nodewatcher.kill @nodewatcher.join @nodes.each do |node| node.detach! node.join end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_secure_forward.rb, line 160 def start super log.debug "starting secure-forward" OpenSSL::Random.seed(SecureRandom.random_bytes(16)) log.debug "start to connect target nodes" @nodes.each do |node| log.debug "connecting node", host: node.host, port: node.port node.start end @nodewatcher = Thread.new(&method(:node_watcher)) @nodewatcher.abort_on_exception = true end
write_objects(tag, es)
click to toggle source
# File lib/fluent/plugin/out_secure_forward.rb, line 255 def write_objects(tag, es) node = select_node || select_node(true) unless node raise "no one nodes with valid ssl session" end log.trace "selected node", host: node.host, port: node.port, standby: node.standby begin send_data(node, tag, es) node.release! rescue Errno::EPIPE, IOError, OpenSSL::SSL::SSLError => e log.warn "Failed to send messages to #{node.host}, parging.", error_class: e.class, error: e node.release! node.detach! raise # to retry #write_objects end end