class Fluent::Plugin::SwiftOutput

Constants

DEFAULT_FORMAT_TYPE
MAX_HEX_RANDOM_LENGTH

Attributes

ext[RW]
formatter[RW]
mime_type[RW]
storage[RW]
swift_object_chunk_buffer[RW]
uuid_flush_enabled[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 61
def initialize
  super
  self.uuid_flush_enabled = false
end

Public Instance Methods

configure(config) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 66
def configure(config)
  compat_parameters_convert(config, :buffer, :formatter, :inject)

  super

  if auth_url.empty?
    raise Fluent::ConfigError, 'auth_url parameter or OS_AUTH_URL variable not defined'
  end
  if auth_user.empty?
    raise Fluent::ConfigError, 'auth_user parameter or OS_USERNAME variable not defined'
  end
  if auth_api_key.empty?
    raise Fluent::ConfigError, 'auth_api_key parameter or OS_PASSWORD variable not defined'
  end

  if hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  unless index_format =~ /^%(0\d*)?[dxX]$/
    raise Fluent::ConfigError, 'index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types'
  end

  self.ext, self.mime_type = case store_as
                             when 'gzip' then ['gz', 'application/x-gzip']
                             when 'lzo' then
                               begin
                                 Open3.capture3('lzop -V')
                               rescue Errno::ENOENT
                                 raise ConfigError, "'lzop' utility must be in PATH for LZO compression"
                               end
                               ['lzo', 'application/x-lzop']
                             when 'json' then ['json', 'application/json']
                             else ['txt', 'text/plain']
                             end

  self.formatter = formatter_create
  self.swift_object_key_format = configure_swift_object_key_format
  self.swift_object_chunk_buffer = {}
end
format(tag, time, record) click to toggle source

docs.fluentd.org/plugin-development/api-plugin-formatter

# File lib/fluent/plugin/out_swift.rb, line 133
def format(tag, time, record)
  new_record = inject_values_to_record(tag, time, record)
  formatter.format(tag, time, new_record)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 107
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 111
def start
  Excon.defaults[:ssl_verify_peer] = ssl_verify
  begin
    self.storage = Fog::Storage.new(
      provider: 'OpenStack',
      openstack_auth_url: auth_url,
      openstack_username: auth_user,
      openstack_api_key: auth_api_key,
      openstack_tenant: auth_tenant,
      openstack_region: auth_region
    )
  rescue StandardError => e
    raise "Can't call Swift API. Please check your ENV OS_*, your credentials or `auth_url` configuration. Error: #{e.inspect}"
  end
  if swift_account
    storage.change_account(swift_account)
  end
  check_container
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 138
def write(chunk)
  i = 0
  metadata = chunk.metadata
  previous_path = nil
  begin
    swift_object_chunk_buffer[chunk.unique_id] ||= {
      '%{hex_random}' => hex_random(chunk: chunk)
    }
    values_for_swift_object_key_pre = {
      '%{path}' => path,
      '%{file_extension}' => ext
    }
    # rubocop:disable Style/FormatString
    values_for_swift_object_key_post = {
      '%{index}' => sprintf(index_format, i)
    }.merge!(swift_object_chunk_buffer[chunk.unique_id])
    # rubocop:enable Style/FormatString

    if uuid_flush_enabled
      values_for_swift_object_key_post['%{uuid_flush}'] = uuid_random
    end

    swift_path = swift_object_key_format.gsub(/%{[^}]+}/) do |matched_key|
      values_for_swift_object_key_pre.fetch(matched_key, matched_key)
    end
    swift_path = extract_placeholders(swift_path, metadata)
    swift_path = swift_path.gsub(/%{[^}]+}/, values_for_swift_object_key_post)

    $log.info("File flushing: #{swift_path}")

    if i.positive? && (swift_path == previous_path)
      if overwrite
        log.warn("File: #{swift_path} already exists, but will overwrite!")
        break
      else
        raise "Duplicated path is generated. Use %{index} in swift_object_key_format: Path: #{swift_path}"
      end
    end

    i += 1
    previous_path = swift_path
  end while check_object_exists(object: swift_path)

  tmp = Tempfile.new('swift-')
  tmp.binmode
  begin
    if store_as == 'gzip'
      w = Zlib::GzipWriter.new(tmp)
      chunk.write_to(w)
      w.close
    elsif store_as == 'lzo'
      w = Tempfile.new('chunk-tmp')
      chunk.write_to(w)
      w.close
      tmp.close
      system "lzop -qf1 -o #{tmp.path} #{w.path}"
    else
      chunk.write_to(tmp)
      tmp.close
    end
    File.open(tmp.path) do |file|
      storage.put_object(
        swift_container,
        swift_path,
        file,
        content_type: mime_type
      )
      swift_object_chunk_buffer.delete(chunk.unique_id)
    end
  ensure
    begin
      tmp.close(true)
    rescue StandardError
      nil
    end
    begin
      w.close
    rescue StandardError
      nil
    end
    begin
      w.unlink
    rescue StandardError
      nil
    end
  end
end

Private Instance Methods

check_container() click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 245
def check_container
  storage.get_container(swift_container)
rescue Fog::OpenStack::Storage::NotFound
  if auto_create_container
    $log.warn("Creating container `#{swift_container}` on `#{auth_url}`, `#{swift_account}`.")
    storage.put_container(swift_container)
  else
    raise "The specified container does not exist: #{swift_container}."
  end
end
check_object_exists(object:) click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 283
def check_object_exists(object:)
  storage.head_object(swift_container, object)
  true
rescue Fog::OpenStack::Storage::NotFound
  false
end
configure_swift_object_key_format() click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 256
def configure_swift_object_key_format
  %w[%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}].each do |ph|
    if swift_object_key_format.include?(ph)
      raise Fluent::ConfigError, %(#{ph} placeholder in swift_object_key_format is removed)
    end
  end

  if swift_object_key_format.include?('%{uuid_flush}')
    begin
      require 'uuidtools'
    rescue LoadError
      raise Fluent::ConfigError, 'uuidtools gem not found. Install uuidtools gem first'
    end
    begin
      uuid_random
    rescue StandardError => e
      raise Fluent::ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}"
    end
    self.uuid_flush_enabled = true
  end

  swift_object_key_format.gsub('%{hostname}') do |_expr|
    log.warn("%{hostname} will be removed in the future. Use `#{Socket.gethostname}` instead.")
    Socket.gethostname
  end
end
hex_random(chunk:) click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 235
def hex_random(chunk:)
  unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
  unique_hex.reverse!
  unique_hex[0...hex_random_length]
end
uuid_random() click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 241
def uuid_random
  ::UUIDTools::UUID.random_create.to_s
end