module Fluent::PluginHelper::Server

Constants

CONNECTION_PROTOCOLS
PROTOCOLS
SERVER_TRANSPORT_PARAMS
ServerInfo

Attributes

_servers[R]

stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 294
def self.included(mod)
  mod.include ServerTransportParams
end
new() click to toggle source
Calls superclass method Fluent::PluginHelper::EventLoop::new
# File lib/fluent/plugin_helper/server.rb, line 298
def initialize
  super
  @_servers = []
  @_server_connections = []
  @_server_mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 305
def configure(conf)
  super

  if @transport_config
    if @transport_config.protocol == :tls
      cert_option_server_validate!(@transport_config)
    end
  end
end
server_attach(title, proto, port, bind, shared, server) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 206
def server_attach(title, proto, port, bind, shared, server)
  @_servers << ServerInfo.new(title, proto, port, bind, shared, server)
  event_loop_attach(server)
end
server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) click to toggle source

server_create(:title, @port) do |data|

# ...

end server_create(:title, @port) do |data, conn|

# ...

end server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|

sock.remote_host
sock.remote_port
# ...

end

# File lib/fluent/plugin_helper/server.rb, line 122
def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
  raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  unless socket
    socket_option_validate!(proto, **socket_options)
    socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
  end

  if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
    raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
  end
  if proto != :udp # UDP options
    raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
    raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
  end

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :udp
    raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
    if socket
      sock = socket
      close_socket = false
    else
      sock = server_create_udp_socket(shared, bind, port)
      socket_option_setter.call(sock)
      close_socket = true
    end
    server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
  when :unix
    raise "not implemented yet"
  else
    raise "BUG: unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end
server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) click to toggle source

server_create_connection(:title, @port) do |conn|

# on connection
source_addr = conn.remote_host
source_port = conn.remote_port
conn.data do |data|
  # on data
  conn.write resp # ...
  conn.close
end

end

# File lib/fluent/plugin_helper/server.rb, line 70
def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
  raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
  raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  socket_option_validate!(proto, **socket_options)
  socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
  when :unix
    raise "not implemented yet"
  else
    raise "unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end
server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 211
def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    @_server_mutex.synchronize do
      @_server_connections << conn
    end
  end
  server.listen(backlog) if backlog
  server
end
server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 224
def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
  context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    @_server_mutex.synchronize do
      @_server_connections << conn
    end
  end
  server.listen(backlog) if backlog
  server
end
server_create_tcp(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 188
def server_create_tcp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tcp, **kwargs, &callback)
end
server_create_tcp_socket(shared, bind, port) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 348
def server_create_tcp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_tcp(bind, port)
         else
           TCPServer.new(bind, port) # this method call can create sockets for AF_INET6
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end
server_create_tls(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 196
def server_create_tls(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tls, **kwargs, &callback)
end
server_create_transport_section_object(opts) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 248
def server_create_transport_section_object(opts)
  transport_section = configured_section_create(:transport)
  SERVER_TRANSPORT_PARAMS.each do |param|
    if opts.has_key?(param)
      transport_section[param] = opts[param]
    end
  end
  transport_section
end
server_create_udp(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 192
def server_create_udp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :udp, **kwargs, &callback)
end
server_create_udp_socket(shared, bind, port) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 359
def server_create_udp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_udp(bind, port)
         else
           family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6
           usock = UDPSocket.new(family)
           usock.bind(bind, port)
           usock
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end
server_create_unix(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 200
def server_create_unix(title, port, **kwargs, &callback)
  server_create(title, port, proto: :unix, **kwargs, &callback)
end
server_socket_manager_client() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 340
def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end
server_wait_until_start() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 48
def server_wait_until_start
  # event_loop_wait_until_start works well for this
end
server_wait_until_stop() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 52
def server_wait_until_stop
  sleep 0.1 while @_servers.any?{|si| si.server.attached? }
  @_servers.each{|si| si.server.close rescue nil }
end
shutdown() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 327
def shutdown
  @_server_connections.each do |conn|
    conn.close rescue nil
  end

  super
end
stop() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 315
def stop
  @_server_mutex.synchronize do
    @_servers.each do |si|
      si.server.detach if si.server.attached?
      # to refuse more connections: (connected sockets are still alive here)
      si.server.close rescue nil
    end
  end

  super
end
terminate() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 335
def terminate
  @_servers = []
  super
end