class Fluent::Plugin::SwiftOutput
Constants
- DEFAULT_FORMAT_TYPE
- MAX_HEX_RANDOM_LENGTH
Attributes
ext[RW]
formatter[RW]
mime_type[RW]
storage[RW]
swift_object_chunk_buffer[RW]
uuid_flush_enabled[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 61 def initialize super self.uuid_flush_enabled = false end
Public Instance Methods
configure(config)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 66 def configure(config) compat_parameters_convert(config, :buffer, :formatter, :inject) super if auth_url.empty? raise Fluent::ConfigError, 'auth_url parameter or OS_AUTH_URL variable not defined' end if auth_user.empty? raise Fluent::ConfigError, 'auth_user parameter or OS_USERNAME variable not defined' end if auth_api_key.empty? raise Fluent::ConfigError, 'auth_api_key parameter or OS_PASSWORD variable not defined' end if hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end unless index_format =~ /^%(0\d*)?[dxX]$/ raise Fluent::ConfigError, 'index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types' end self.ext, self.mime_type = case store_as when 'gzip' then ['gz', 'application/x-gzip'] when 'lzo' then begin Open3.capture3('lzop -V') rescue Errno::ENOENT raise ConfigError, "'lzop' utility must be in PATH for LZO compression" end ['lzo', 'application/x-lzop'] when 'json' then ['json', 'application/json'] else ['txt', 'text/plain'] end self.formatter = formatter_create self.swift_object_key_format = configure_swift_object_key_format self.swift_object_chunk_buffer = {} end
format(tag, time, record)
click to toggle source
docs.fluentd.org/plugin-development/api-plugin-formatter
# File lib/fluent/plugin/out_swift.rb, line 133 def format(tag, time, record) new_record = inject_values_to_record(tag, time, record) formatter.format(tag, time, new_record) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 107 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_swift.rb, line 111 def start Excon.defaults[:ssl_verify_peer] = ssl_verify begin self.storage = Fog::Storage.new( provider: 'OpenStack', openstack_auth_url: auth_url, openstack_username: auth_user, openstack_api_key: auth_api_key, openstack_tenant: auth_tenant, openstack_region: auth_region ) rescue StandardError => e raise "Can't call Swift API. Please check your ENV OS_*, your credentials or `auth_url` configuration. Error: #{e.inspect}" end if swift_account storage.change_account(swift_account) end check_container super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 138 def write(chunk) i = 0 metadata = chunk.metadata previous_path = nil begin swift_object_chunk_buffer[chunk.unique_id] ||= { '%{hex_random}' => hex_random(chunk: chunk) } values_for_swift_object_key_pre = { '%{path}' => path, '%{file_extension}' => ext } # rubocop:disable Style/FormatString values_for_swift_object_key_post = { '%{index}' => sprintf(index_format, i) }.merge!(swift_object_chunk_buffer[chunk.unique_id]) # rubocop:enable Style/FormatString if uuid_flush_enabled values_for_swift_object_key_post['%{uuid_flush}'] = uuid_random end swift_path = swift_object_key_format.gsub(/%{[^}]+}/) do |matched_key| values_for_swift_object_key_pre.fetch(matched_key, matched_key) end swift_path = extract_placeholders(swift_path, metadata) swift_path = swift_path.gsub(/%{[^}]+}/, values_for_swift_object_key_post) $log.info("File flushing: #{swift_path}") if i.positive? && (swift_path == previous_path) if overwrite log.warn("File: #{swift_path} already exists, but will overwrite!") break else raise "Duplicated path is generated. Use %{index} in swift_object_key_format: Path: #{swift_path}" end end i += 1 previous_path = swift_path end while check_object_exists(object: swift_path) tmp = Tempfile.new('swift-') tmp.binmode begin if store_as == 'gzip' w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.close elsif store_as == 'lzo' w = Tempfile.new('chunk-tmp') chunk.write_to(w) w.close tmp.close system "lzop -qf1 -o #{tmp.path} #{w.path}" else chunk.write_to(tmp) tmp.close end File.open(tmp.path) do |file| storage.put_object( swift_container, swift_path, file, content_type: mime_type ) swift_object_chunk_buffer.delete(chunk.unique_id) end ensure begin tmp.close(true) rescue StandardError nil end begin w.close rescue StandardError nil end begin w.unlink rescue StandardError nil end end end
Private Instance Methods
check_container()
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 245 def check_container storage.get_container(swift_container) rescue Fog::OpenStack::Storage::NotFound if auto_create_container $log.warn("Creating container `#{swift_container}` on `#{auth_url}`, `#{swift_account}`.") storage.put_container(swift_container) else raise "The specified container does not exist: #{swift_container}." end end
check_object_exists(object:)
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 283 def check_object_exists(object:) storage.head_object(swift_container, object) true rescue Fog::OpenStack::Storage::NotFound false end
configure_swift_object_key_format()
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 256 def configure_swift_object_key_format %w[%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}].each do |ph| if swift_object_key_format.include?(ph) raise Fluent::ConfigError, %(#{ph} placeholder in swift_object_key_format is removed) end end if swift_object_key_format.include?('%{uuid_flush}') begin require 'uuidtools' rescue LoadError raise Fluent::ConfigError, 'uuidtools gem not found. Install uuidtools gem first' end begin uuid_random rescue StandardError => e raise Fluent::ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}" end self.uuid_flush_enabled = true end swift_object_key_format.gsub('%{hostname}') do |_expr| log.warn("%{hostname} will be removed in the future. Use `#{Socket.gethostname}` instead.") Socket.gethostname end end
hex_random(chunk:)
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 235 def hex_random(chunk:) unique_hex = Fluent::UniqueId.hex(chunk.unique_id) unique_hex.reverse! unique_hex[0...hex_random_length] end
uuid_random()
click to toggle source
# File lib/fluent/plugin/out_swift.rb, line 241 def uuid_random ::UUIDTools::UUID.random_create.to_s end