class Fluent::Plugin::ForwardOutput
Constants
- FORWARD_HEADER
MessagePack FixArray length is 3
- LISTEN_PORT
Attributes
nodes[R]
read_interval[R]
recover_sample_size[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Output::new
# File lib/fluent/plugin/out_forward.rb, line 160 def initialize super @nodes = [] #=> [Node] @loop = nil @thread = nil @usock = nil @keep_alive_watcher_interval = 5 # TODO @suspend_flush = false @healthy_nodes_count_metrics = nil @registered_nodes_count_metrics = nil end
Public Instance Methods
after_shutdown()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#after_shutdown
# File lib/fluent/plugin/out_forward.rb, line 346 def after_shutdown last_ack if @require_ack_response super end
before_shutdown()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#before_shutdown
# File lib/fluent/plugin/out_forward.rb, line 341 def before_shutdown super @suspend_flush = true end
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_forward.rb, line 324 def close if @usock # close socket and ignore errors: this socket will not be used anyway. @usock.close rescue nil end super end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_forward.rb, line 174 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super unless @chunk_key_tag raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output" end @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @heartbeat_type == :tcp log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead." @heartbeat_type = :transport end if @dns_round_robin && @heartbeat_type == :udp raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option" end if @transport == :tls # socket helper adds CA cert or signed certificate to same cert store internally so unify it in this place. if @tls_cert_path && !@tls_cert_path.empty? @tls_ca_cert_path = @tls_cert_path end if @tls_ca_cert_path && !@tls_ca_cert_path.empty? @tls_ca_cert_path.each do |path| raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path) raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path) end end if @tls_insecure_mode log.warn "TLS transport is configured in insecure way" @tls_verify_hostname = false @tls_allow_self_signed_cert = true end if Fluent.windows? if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted" end else raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint end end @ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil @connection_manager = ConnectionManager.new( log: @log, secure: !!@security, connection_factory: method(:create_transfer_socket), socket_cache: socket_cache, ) service_discovery_configure( :out_forward_service_discovery_watcher, static_default_service_directive: 'server', load_balancer: LoadBalancer.new(log), custom_build_method: method(:build_node), ) service_discovery_services.each do |server| # it's only for test @nodes << server unless @heartbeat_type == :none begin server.validate_host_resolution! rescue => e raise unless @ignore_network_errors_at_startup log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e server.disable! end end end unless @as_secondary if @compress == :gzip && @buffer.compress == :text @buffer.compress = :gzip elsif @compress == :text && @buffer.compress == :gzip log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" end end if service_discovery_services.empty? raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>" end if !@keepalive && @keepalive_timeout log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.') end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 @healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true) @registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true) end
create_transfer_socket(host, port, hostname, &block)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 379 def create_transfer_socket(host, port, hostname, &block) case @transport when :tls socket_create_tls( host, port, version: @tls_version, ciphers: @tls_ciphers, insecure: @tls_insecure_mode, verify_fqdn: @tls_verify_hostname, fqdn: hostname, allow_self_signed_cert: @tls_allow_self_signed_cert, cert_paths: @tls_ca_cert_path, cert_path: @tls_client_cert_path, private_key_path: @tls_client_private_key_path, private_key_passphrase: @tls_client_private_key_passphrase, cert_thumbprint: @tls_cert_thumbprint, cert_logical_store_name: @tls_cert_logical_store_name, cert_use_enterprise_store: @tls_cert_use_enterprise_store, # Enabling SO_LINGER causes tcp port exhaustion on Windows. # This is because dynamic ports are only 16384 (from 49152 to 65535) and # expiring SO_LINGER enabled ports should wait 4 minutes # where set by TcpTimeDelay. Its default value is 4 minutes. # So, we should disable SO_LINGER on Windows to prevent flood of waiting ports. linger_timeout: Fluent.windows? ? nil : @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, connect_timeout: @connect_timeout, &block ) when :tcp socket_create_tcp( host, port, linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, connect_timeout: @connect_timeout, &block ) else raise "BUG: unknown transport protocol #{@transport}" end end
forward_header()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 445 def forward_header FORWARD_HEADER end
last_ack()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 356 def last_ack overwrite_delayed_commit_timeout ack_check(ack_select_interval) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 275 def multi_workers_ready? true end
overwrite_delayed_commit_timeout()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 283 def overwrite_delayed_commit_timeout # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout # But it should be overwritten by ack_response_timeout to rollback chunks after timeout if @delayed_commit_timeout != @ack_response_timeout log.info "delayed_commit_timeout is overwritten by ack_response_timeout" @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s end end
prefer_delayed_commit()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 279 def prefer_delayed_commit @require_ack_response end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Output#start
# File lib/fluent/plugin/out_forward.rb, line 292 def start super unless @heartbeat_type == :none if @heartbeat_type == :udp @usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true) server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv)) end timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer)) end if @require_ack_response overwrite_delayed_commit_timeout thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end if @verify_connection_at_startup service_discovery_services.each do |node| begin node.verify_connection rescue StandardError => e log.fatal "forward's connection setting error: #{e.message}" raise Fluent::UnrecoverableError, e.message end end end if @keepalive timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks)) end end
statistics()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#statistics
# File lib/fluent/plugin/out_forward.rb, line 423 def statistics stats = super services = service_discovery_services @healthy_nodes_count_metrics.set(0) @registered_nodes_count_metrics.set(services.size) services.each do |s| if s.available? @healthy_nodes_count_metrics.inc end end stats = { 'output' => stats["output"].merge({ 'healthy_nodes_count' => @healthy_nodes_count_metrics.get, 'registered_nodes_count' => @registered_nodes_count_metrics.get, }) } stats end
stop()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#stop
# File lib/fluent/plugin/out_forward.rb, line 333 def stop super if @keepalive @connection_manager.stop end end
try_flush()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#try_flush
# File lib/fluent/plugin/out_forward.rb, line 351 def try_flush return if @require_ack_response && @suspend_flush super end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 368 def try_write(chunk) log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id) if chunk.empty? commit_write(chunk.unique_id) return end tag = chunk.metadata.tag service_discovery_select_service { |node| node.send_data(tag, chunk) } last_ack if @require_ack_response && @suspend_flush end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 361 def write(chunk) return if chunk.empty? tag = chunk.metadata.tag service_discovery_select_service { |node| node.send_data(tag, chunk) } end
Private Instance Methods
ack_check(select_interval)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 516 def ack_check(select_interval) @ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result| @connection_manager.close(sock) case result when AckHandler::Result::SUCCESS commit_write(chunk_id) when AckHandler::Result::FAILED node&.disable! rollback_write(chunk_id, update_retry: false) if chunk_id when AckHandler::Result::CHUNKID_UNMATCHED rollback_write(chunk_id, update_retry: false) else log.warn("BUG: invalid status #{result} #{chunk_id}") if chunk_id rollback_write(chunk_id, update_retry: false) end end end end
ack_reader()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 508 def ack_reader select_interval = ack_select_interval while thread_current_running? ack_check(select_interval) end end
ack_select_interval()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 500 def ack_select_interval if @delayed_commit_timeout > 3 1 else @delayed_commit_timeout / 3.0 end end
build_node(server)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 451 def build_node(server) name = server.name || "#{server.host}:#{server.port}" log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) if @heartbeat_type == :none NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) else Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) end end
on_heartbeat_timer()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 463 def on_heartbeat_timer need_rebuild = false service_discovery_services.each do |n| begin log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type n.usock = @usock if @usock need_rebuild = n.send_heartbeat || need_rebuild rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED, Errno::ETIMEDOUT => e log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: e rescue => e log.debug "unexpected error happen during heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: e end need_rebuild = n.tick || need_rebuild end if need_rebuild service_discovery_rebalance end end
on_purge_obsolete_socks()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 496 def on_purge_obsolete_socks @connection_manager.purge_obsolete_socks end
on_udp_heatbeat_response_recv(data, sock)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 484 def on_udp_heatbeat_response_recv(data, sock) sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host) if node = service_discovery_services.find { |n| n.sockaddr == sockaddr } # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port if node.heartbeat service_discovery_rebalance end else log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out") end end