class Fluent::Plugin::ForwardOutput
Constants
- ACKWaitingSockInfo
- FORWARD_HEADER
MessagePack FixArray length is 3
- LISTEN_PORT
Attributes
nodes[R]
read_interval[R]
recover_sample_size[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Output::new
# File lib/fluent/plugin/out_forward.rb, line 133 def initialize super @nodes = [] #=> [Node] @loop = nil @thread = nil @usock = nil @sock_ack_waiting = nil @sock_ack_waiting_mutex = nil end
Public Instance Methods
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_forward.rb, line 258 def close if @usock # close socket and ignore errors: this socket will not be used anyway. @usock.close rescue nil end super end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_forward.rb, line 145 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super unless @chunk_key_tag raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output" end @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @heartbeat_type == :tcp log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead." @heartbeat_type = :transport end if @dns_round_robin if @heartbeat_type == :udp raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option" end end if @transport == :tls if @tls_cert_path && !@tls_cert_path.empty? @tls_cert_path.each do |path| raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path) raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path) end end if @tls_insecure_mode log.warn "TLS transport is configured in insecure way" @tls_verify_hostname = false @tls_allow_self_signed_cert = true end end @servers.each do |server| failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) name = server.name || "#{server.host}:#{server.port}" log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id if @heartbeat_type == :none @nodes << NoneHeartbeatNode.new(self, server, failure: failure) else node = Node.new(self, server, failure: failure) begin node.validate_host_resolution! rescue => e raise unless @ignore_network_errors_at_startup log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e node.disable! end @nodes << node end end unless @as_secondary if @compress == :gzip && @buffer.compress == :text @buffer.compress = :gzip elsif @compress == :text && @buffer.compress == :gzip log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" end end if @nodes.empty? raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required" end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end
create_transfer_socket(host, port, hostname, &block)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 316 def create_transfer_socket(host, port, hostname, &block) case @transport when :tls socket_create_tls( host, port, version: @tls_version, ciphers: @tls_ciphers, insecure: @tls_insecure_mode, verify_fqdn: @tls_verify_hostname, fqdn: hostname, allow_self_signed_cert: @tls_allow_self_signed_cert, cert_paths: @tls_cert_path, linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, &block ) when :tcp socket_create_tcp( host, port, linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, &block ) else raise "BUG: unknown transport protocol #{@transport}" end end
forward_header()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 348 def forward_header FORWARD_HEADER end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 218 def multi_workers_ready? true end
prefer_delayed_commit()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 222 def prefer_delayed_commit @require_ack_response end
select_a_healthy_node() { |node| ... }
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 294 def select_a_healthy_node error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] next unless node.available? begin ret = yield node return ret, node rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end raise error if error raise NoNodesAvailable, "no nodes are available" end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Output#start
# File lib/fluent/plugin/out_forward.rb, line 226 def start super # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout # But it should be overwritten by ack_response_timeout to rollback chunks after timeout if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout log.info "delayed_commit_timeout is overwritten by ack_response_timeout" @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s end @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 unless @heartbeat_type == :none if @heartbeat_type == :udp @usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true) server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length) do |data, sock| sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host) on_heartbeat(sockaddr, data) end end timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_timer)) end if @require_ack_response @sock_ack_waiting_mutex = Mutex.new @sock_ack_waiting = [] thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 278 def try_write(chunk) log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id) if chunk.empty? commit_write(chunk.unique_id) return end tag = chunk.metadata.tag sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) } chunk_id_base64 = Base64.encode64(chunk.unique_id) current_time = Fluent::Clock.now info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout) @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting << info end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 266 def write(chunk) return if chunk.empty? tag = chunk.metadata.tag select_a_healthy_node{|node| node.send_data(tag, chunk) } end
Private Instance Methods
ack_reader()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 458 def ack_reader select_interval = if @delayed_commit_timeout > 3 1 else @delayed_commit_timeout / 3.0 end unpacker = Fluent::Engine.msgpack_unpacker while thread_current_running? now = Fluent::Clock.now sockets = [] begin @sock_ack_waiting_mutex.synchronize do new_list = [] @sock_ack_waiting.each do |info| if info.expired?(now) # There are 2 types of cases when no response has been received from socket: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! info.sock.close rescue nil rollback_write(info.chunk_id) else sockets << info.sock new_list << info end end @sock_ack_waiting = new_list end readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) next unless readable_sockets readable_sockets.each do |sock| chunk_id = read_ack_from_sock(sock, unpacker) commit_write(chunk_id) end rescue => e log.error "unexpected error while receiving ack", error: e log.error_backtrace end end end
on_heartbeat(sockaddr, msg)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 411 def on_heartbeat(sockaddr, msg) if node = @nodes.find {|n| n.sockaddr == sockaddr } # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port if node.heartbeat rebuild_weight_array end end end
on_timer()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 396 def on_timer @nodes.each {|n| if n.tick rebuild_weight_array end begin log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type n.usock = @usock if @usock n.send_heartbeat rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: $! end } end
read_ack_from_sock(sock, unpacker)
click to toggle source
return chunk id to be committed
# File lib/fluent/plugin/out_forward.rb, line 421 def read_ack_from_sock(sock, unpacker) begin raw_data = sock.recv(@read_length) rescue Errno::ECONNRESET raw_data = "" end info = @sock_ack_waiting_mutex.synchronize{ @sock_ack_waiting.find{|i| i.sock == sock } } # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. # If this happens we assume the data wasn't delivered and retry it. if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! return nil else unpacker.feed(raw_data) res = unpacker.read log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res if res['ack'] != info.chunk_id_base64 # Some errors may have occured when ack and chunk id is different, so send the chunk again. log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] rollback_write(info.chunk_id) return nil else log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id) end return info.chunk_id end rescue => e log.error "unexpected error while receiving ack message", error: e log.error_backtrace ensure @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting.delete(info) end end
rebuild_weight_array()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 354 def rebuild_weight_array standby_nodes, regular_nodes = @nodes.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } log.debug "rebuilding weight array", lost_weight: lost_weight if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @weight_array = weight_array end