class Fluent::SecureForwardInput

Constants

DEFAULT_SECURE_LISTEN_PORT
HOSTNAME_PLACEHOLDERS

Attributes

nodes[R]
read_interval[R]
sessions[R]
socket_interval[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 90
def initialize
  super
  @cert = nil
end

Public Instance Methods

certificate() click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 201
def certificate
  return @cert, @key if @cert && @key

  @client_ca = nil
  if @cert_path
    @key = OpenSSL::PKey::RSA.new(File.read(@private_key_path), @private_key_passphrase)
    certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
    @cert = certs.shift
    @client_ca = certs
  elsif @ca_cert_path
    opts = {
      ca_cert_path: @ca_cert_path,
      ca_key_path: @ca_private_key_path,
      ca_key_passphrase: @ca_private_key_passphrase,
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_server_pair(opts)
  else
    opts = {
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_self_signed_server_pair(opts)
  end
  return @cert, @key
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 112
def configure(conf)
  hostname = conf.has_key?('hostname') ? conf['hostname'].to_s : Socket.gethostname
  replace_hostname_placeholder(conf, hostname)

  super

  if @secure
    unless @cert_path || @ca_cert_path
      raise Fluent::ConfigError, "cert_path or ca_cert_path required for secure communication"
    end
    if @cert_path
      raise Fluent::ConfigError, "private_key_path required" unless @private_key_path
      raise Fluent::ConfigError, "private_key_passphrase required" unless @private_key_passphrase
      certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
      if certs.size < 1
        raise Fluent::ConfigError, "no valid certificates in cert_path: #{@cert_path}"
      end
    else # @ca_cert_path
      raise Fluent::ConfigError, "ca_private_key_path required" unless @ca_private_key_path
      raise Fluent::ConfigError, "ca_private_key_passphrase required" unless @ca_private_key_passphrase
    end
  else
    log.warn "'insecure' mode has vulnerability for man-in-the-middle attacks for clients (output plugins)."
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @nodes = []

  @clients.each do |client|
    if client.host && client.network
      raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
    end
    if !client.host && !client.network
      raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
    end
    source = nil
    if client.host
      begin
        source = IPSocket.getaddress(client.host)
      rescue SocketError => e
        raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
      end
    end
    source_addr = begin
                    IPAddr.new(source || client.network)
                  rescue ArgumentError => e
                    raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                  end
    @nodes.push({
        address: source_addr,
        shared_key: (client.shared_key || @shared_key),
        users: (client.users ? client.users.split(',') : nil)
      })
  end

  @generate_cert_common_name ||= @self_hostname

  # To check whether certificates are successfully generated/loaded at startup time
  self.certificate

  true
end
on_message(msg) click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 282
def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    router.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    router.emit(tag, time, record)
  end
end
replace_hostname_placeholder(conf, hostname) click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 97
def replace_hostname_placeholder(conf, hostname)
  replace_element = ->(c) {
    c.keys.each do |key|
      v = c[key]
      if v && v.respond_to?(:include?) && v.respond_to?(:gsub)
        if HOSTNAME_PLACEHOLDERS.any?{|ph| v.include?(ph) }
          c[key] = HOSTNAME_PLACEHOLDERS.inject(v){|r, ph| r.gsub(ph, hostname) }
        end
      end
    end
    c.elements.each{|e| replace_element.call(e) }
  }
  replace_element.call(conf)
end
run() click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 235
def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate

  ctx = OpenSSL::SSL::SSLContext.new(@ssl_version)
  if @secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    ctx.set_params({})

    if @ssl_ciphers
      ctx.ciphers = @ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      ctx.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end
  end

  ctx.cert = cert
  ctx.key = key
  if @client_ca
    ctx.extra_chain_cert = @client_ca
  end

  log.trace "start to listen", bind: @bind, port: @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", bind: @bind, port: @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)

        # cleanup closed session instance
        @sessions.delete_if(&:closed?)
        log.trace "session instances:", all: @sessions.size, closed: @sessions.select(&:closed?).size
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end
select_authenticate_users(node, username) click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 193
def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u.username == username}
  else
    @users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 186
def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 177
def start
  super
  OpenSSL::Random.seed(SecureRandom.random_bytes(16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
  @listener.abort_on_exception
end