class Fluent::Plugin::SocketHandler

Constants

MAX_LENGTH_RECEIVE_ONCE

Public Class Methods

new(path, delimiter: "\n", log: nil) click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 73
def initialize(path, delimiter: "\n", log: nil)
  @path = path
  @log = log
  @socket = nil
  @buf = Buffer.new(delimiter)
end

Public Instance Methods

connected?() click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 80
def connected?
  !@socket.nil?
end
try_close() click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 102
def try_close
  @socket&.close
rescue => e
  @log&.error "in_unix_client: failed to close socket. #{e.message}"
ensure
  @socket = nil
end
try_receive(timeout: 1) click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 84
def try_receive(timeout: 1)
  unless connected?
    try_open
    return []
  end
  return [] unless exist_data?(timeout)

  records, has_closed = try_get_records

  if has_closed
    @log&.warn "in_unix_client: server socket seems to be closed."
    try_close
    return []
  end

  records
end

Private Instance Methods

exist_data?(timeout) click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 121
def exist_data?(timeout)
  return true if IO::select([@socket], nil, nil, timeout)
  false
end
try_get_records() click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 126
def try_get_records
  msg, * = @socket.recvmsg_nonblock(MAX_LENGTH_RECEIVE_ONCE)
  has_closed = msg.empty?
  return [], has_closed if has_closed

  @buf << msg
  records = @buf.extract_records
  return records, has_closed
rescue IO::WaitReadable => e
  @log&.debug "in_unix_client: there were no data though the socket was recognized readable by IO::select. #{e.message}"
  sleep 3
rescue => e
  @log&.error "in_unix_client: failed to receive data. #{e.message}"
  sleep 3
end
try_open() click to toggle source
# File lib/fluent/plugin/in_unix_client.rb, line 112
def try_open
  @socket = UNIXSocket.open(@path)
  @log&.info "in_unix_client: opened socket: #{@path}."
rescue => e
  @log&.warn "in_unix_client: failed to open socket: #{@path}, due to: #{e.message}"
  @socket = nil
  sleep 3
end