MessagePack FixArray length = 3 (if @extend_internal_protocol)
= 2 (else)
Linux default tcp_syn_retries is 5 (in many environment) 3 + 6 + 12 + 24 + 48 + 96 -> 189 (sec)
# File lib/fluent/plugin/out_forward.rb, line 30 def initialize super require "base64" require 'socket' require 'fileutils' require 'fluent/plugin/socket_util' @nodes = [] #=> [Node] end
# File lib/fluent/plugin/out_forward.rb, line 72 def configure(conf) super # backward compatibility if host = conf['host'] log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead." port = conf['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT e = conf.add_element('server') e['host'] = host e['port'] = port.to_s end recover_sample_size = @recover_wait / @heartbeat_interval # add options here if any options addes which uses extended protocol @extend_internal_protocol = if @require_ack_response true else false end conf.elements.each {|e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT weight = e['weight'] weight = weight ? weight.to_i : 60 standby = !!e['standby'] name = e['name'] unless name name = "#{host}:#{port}" end failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) node_conf = NodeConfig.new(name, host, port, weight, standby, failure, @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector) @nodes << Node.new(log, node_conf) log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id } end
# File lib/fluent/plugin/out_forward.rb, line 150 def run @loop.run rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end
# File lib/fluent/plugin/out_forward.rb, line 142 def shutdown @finished = true @loop.watchers.each {|w| w.detach } @loop.stop @thread.join @usock.close if @usock end
# File lib/fluent/plugin/out_forward.rb, line 119 def start super @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp @usock = SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end
# File lib/fluent/plugin/out_forward.rb, line 157 def write_objects(tag, chunk) return if chunk.empty? error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end
# File lib/fluent/plugin/out_forward.rb, line 333 def connect(node) # TODO unix socket? TCPSocket.new(node.resolved_host, node.port) end
# File lib/fluent/plugin/out_forward.rb, line 233 def forward_header if @extend_internal_protocol FORWARD_HEADER_EXT else FORWARD_HEADER end end
# File lib/fluent/plugin/out_forward.rb, line 393 def on_heartbeat(sockaddr, msg) port, host = Socket.unpack_sockaddr_in(sockaddr) if node = @nodes.find {|n| n.sockaddr == sockaddr } #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port if node.heartbeat rebuild_weight_array end end end
# File lib/fluent/plugin/out_forward.rb, line 351 def on_timer return if @finished @nodes.each {|n| if n.tick rebuild_weight_array end begin #log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}" if @heartbeat_type == :tcp send_heartbeat_tcp(n) else @usock.send "\00"", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host) end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR # TODO log log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s end } end
# File lib/fluent/plugin/out_forward.rb, line 187 def rebuild_weight_array standby_nodes, regular_nodes = @nodes.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } log.debug "rebuilding weight array", :lost_weight=>lost_weight if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n log.warn "using standby node #{n.host}:#{n.port}", :weight=>n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @weight_array = weight_array end
# File lib/fluent/plugin/out_forward.rb, line 257 def send_data(node, tag, chunk) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) # beginArray(2) sock.write forward_header # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) if @extend_internal_protocol option = {} option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response sock.write option.to_msgpack if @require_ack_response && @ack_response_timeout > 0 # Waiting for a response here results in a decrease of throughput because a chunk queue is locked. # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses # and process them asynchronously. if IO.select([sock], nil, nil, @ack_response_timeout) raw_data = sock.recv(1024) # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. # In this case, the node is available and successfully close connection without sending responses. # ForwardInput is not expected to do so, but some alternatives may do so. # Therefore do not send the chunk again. unless raw_data.empty? # Serialization type of the response is same as sent data. res = MessagePack.unpack(raw_data) if res['ack'] != option['chunk'] # Some errors may have occured when ack and chunk id is different, so send the chunk again. raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" end end else # IO.select returns nil on timeout. # There are 2 types of cases when no response has been received: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable." node.disable! raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK" end end end node.heartbeat(false) return res # for test ensure sock.close end end
FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
# File lib/fluent/plugin/out_forward.rb, line 242 def send_heartbeat_tcp(node) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval # don't send any data to not cause a compatibility problem #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) #sock.write FORWARD_TCP_HEARTBEAT_DATA node.heartbeat(true) ensure sock.close end end