class Fluent::Plugin::ForwardOutput

Constants

ACKWaitingSockInfo
FORWARD_HEADER

MessagePack FixArray length is 3

LISTEN_PORT

Attributes

nodes[R]
read_interval[R]
recover_sample_size[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Output::new
# File lib/fluent/plugin/out_forward.rb, line 133
def initialize
  super

  @nodes = [] #=> [Node]
  @loop = nil
  @thread = nil

  @usock = nil
  @sock_ack_waiting = nil
  @sock_ack_waiting_mutex = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_forward.rb, line 258
def close
  if @usock
    # close socket and ignore errors: this socket will not be used anyway.
    @usock.close rescue nil
  end
  super
end
configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_forward.rb, line 145
def configure(conf)
  compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')

  super

  unless @chunk_key_tag
    raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
  end

  @read_interval = @read_interval_msec / 1000.0
  @recover_sample_size = @recover_wait / @heartbeat_interval

  if @heartbeat_type == :tcp
    log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
    @heartbeat_type = :transport
  end

  if @dns_round_robin
    if @heartbeat_type == :udp
      raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
    end
  end

  if @transport == :tls
    if @tls_cert_path && !@tls_cert_path.empty?
      @tls_cert_path.each do |path|
        raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
        raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
      end
    end

    if @tls_insecure_mode
      log.warn "TLS transport is configured in insecure way"
      @tls_verify_hostname = false
      @tls_allow_self_signed_cert = true
    end
  end

  @servers.each do |server|
    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
    name = server.name || "#{server.host}:#{server.port}"

    log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
    if @heartbeat_type == :none
      @nodes << NoneHeartbeatNode.new(self, server, failure: failure)
    else
      node = Node.new(self, server, failure: failure)
      begin
        node.validate_host_resolution!
      rescue => e
        raise unless @ignore_network_errors_at_startup
        log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
        node.disable!
      end
      @nodes << node
    end
  end

  unless @as_secondary
    if @compress == :gzip && @buffer.compress == :text
      @buffer.compress = :gzip
    elsif @compress == :text && @buffer.compress == :gzip
      log.info "buffer is compressed.  If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
    end
  end

  if @nodes.empty?
    raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
  end

  raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end
create_transfer_socket(host, port, hostname, &block) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 316
def create_transfer_socket(host, port, hostname, &block)
  case @transport
  when :tls
    socket_create_tls(
      host, port,
      version: @tls_version,
      ciphers: @tls_ciphers,
      insecure: @tls_insecure_mode,
      verify_fqdn: @tls_verify_hostname,
      fqdn: hostname,
      allow_self_signed_cert: @tls_allow_self_signed_cert,
      cert_paths: @tls_cert_path,
      linger_timeout: @send_timeout,
      send_timeout: @send_timeout,
      recv_timeout: @ack_response_timeout,
      &block
    )
  when :tcp
    socket_create_tcp(
      host, port,
      linger_timeout: @send_timeout,
      send_timeout: @send_timeout,
      recv_timeout: @ack_response_timeout,
      &block
    )
  else
    raise "BUG: unknown transport protocol #{@transport}"
  end
end
forward_header() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 348
def forward_header
  FORWARD_HEADER
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 218
def multi_workers_ready?
  true
end
prefer_delayed_commit() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 222
def prefer_delayed_commit
  @require_ack_response
end
select_a_healthy_node() { |node| ... } click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 294
def select_a_healthy_node
  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]
    next unless node.available?

    begin
      ret = yield node
      return ret, node
    rescue
      # for load balancing during detecting crashed servers
      error = $!  # use the latest error
    end
  end

  raise error if error
  raise NoNodesAvailable, "no nodes are available"
end
start() click to toggle source
Calls superclass method Fluent::Compat::Output#start
# File lib/fluent/plugin/out_forward.rb, line 226
def start
  super

  # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
  # But it should be overwritten by ack_response_timeout to rollback chunks after timeout
  if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout
    log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
    @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
  end

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0

  unless @heartbeat_type == :none
    if @heartbeat_type == :udp
      @usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true)
      server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length) do |data, sock|
        sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
        on_heartbeat(sockaddr, data)
      end
    end
    timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_timer))
  end

  if @require_ack_response
    @sock_ack_waiting_mutex = Mutex.new
    @sock_ack_waiting = []
    thread_create(:out_forward_receiving_ack, &method(:ack_reader))
  end
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 278
def try_write(chunk)
  log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
  if chunk.empty?
    commit_write(chunk.unique_id)
    return
  end
  tag = chunk.metadata.tag
  sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) }
  chunk_id_base64 = Base64.encode64(chunk.unique_id)
  current_time = Fluent::Clock.now
  info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout)
  @sock_ack_waiting_mutex.synchronize do
    @sock_ack_waiting << info
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 266
def write(chunk)
  return if chunk.empty?
  tag = chunk.metadata.tag
  select_a_healthy_node{|node| node.send_data(tag, chunk) }
end

Private Instance Methods

ack_reader() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 458
def ack_reader
  select_interval = if @delayed_commit_timeout > 3
                      1
                    else
                      @delayed_commit_timeout / 3.0
                    end

  unpacker = Fluent::Engine.msgpack_unpacker

  while thread_current_running?
    now = Fluent::Clock.now
    sockets = []
    begin
      @sock_ack_waiting_mutex.synchronize do
        new_list = []
        @sock_ack_waiting.each do |info|
          if info.expired?(now)
            # There are 2 types of cases when no response has been received from socket:
            # (1) the node does not support sending responses
            # (2) the node does support sending response but responses have not arrived for some reasons.
            log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
            info.node.disable!
            info.sock.close rescue nil
            rollback_write(info.chunk_id)
          else
            sockets << info.sock
            new_list << info
          end
        end
        @sock_ack_waiting = new_list
      end

      readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
      next unless readable_sockets

      readable_sockets.each do |sock|
        chunk_id = read_ack_from_sock(sock, unpacker)
        commit_write(chunk_id)
      end
    rescue => e
      log.error "unexpected error while receiving ack", error: e
      log.error_backtrace
    end
  end
end
on_heartbeat(sockaddr, msg) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 411
def on_heartbeat(sockaddr, msg)
  if node = @nodes.find {|n| n.sockaddr == sockaddr }
    # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
    if node.heartbeat
      rebuild_weight_array
    end
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 396
def on_timer
  @nodes.each {|n|
    if n.tick
      rebuild_weight_array
    end
    begin
      log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
      n.usock = @usock if @usock
      n.send_heartbeat
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED
      log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: $!
    end
  }
end
read_ack_from_sock(sock, unpacker) click to toggle source

return chunk id to be committed

# File lib/fluent/plugin/out_forward.rb, line 421
def read_ack_from_sock(sock, unpacker)
  begin
    raw_data = sock.recv(@read_length)
  rescue Errno::ECONNRESET
    raw_data = ""
  end
  info = @sock_ack_waiting_mutex.synchronize{ @sock_ack_waiting.find{|i| i.sock == sock } }

  # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
  # If this happens we assume the data wasn't delivered and retry it.
  if raw_data.empty?
    log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
    info.node.disable!
    return nil
  else
    unpacker.feed(raw_data)
    res = unpacker.read
    log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
    if res['ack'] != info.chunk_id_base64
      # Some errors may have occured when ack and chunk id is different, so send the chunk again.
      log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
      rollback_write(info.chunk_id)
      return nil
    else
      log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
    end
    return info.chunk_id
  end
rescue => e
  log.error "unexpected error while receiving ack message", error: e
  log.error_backtrace
ensure
  @sock_ack_waiting_mutex.synchronize do
    @sock_ack_waiting.delete(info)
  end
end
rebuild_weight_array() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 354
def rebuild_weight_array
  standby_nodes, regular_nodes = @nodes.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  log.debug "rebuilding weight array", lost_weight: lost_weight

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @weight_array = weight_array
end