class Fluent::Plugin::Output
Constants
- BUFFER_STATS_KEYS
- CHUNKING_FIELD_WARN_NUM
- CHUNK_ID_PLACEHOLDER_PATTERN
- CHUNK_KEY_PATTERN
- CHUNK_KEY_PLACEHOLDER_PATTERN
- CHUNK_TAG_PLACEHOLDER_PATTERN
- DequeuedChunkInfo
- FORMAT_COMPRESSED_MSGPACK_STREAM
- FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT
- FORMAT_MSGPACK_STREAM
- FORMAT_MSGPACK_STREAM_TIME_INT
- FlushThreadState
Internal states
- TIMESTAMP_CHECK_BASE_TIME
- TIME_KEY_PLACEHOLDER_THRESHOLDS
- UNRECOVERABLE_ERRORS
Attributes
for tests
for tests
for tests
for tests
for tests
for tests
output_enqueue_thread_waiting
: for test of output.rb itself
for tests
Public Class Methods
Fluent::PluginLoggerMixin::new
# File lib/fluent/plugin/output.rb, line 198 def initialize super @counter_mutex = Mutex.new @flush_thread_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @primary_instance = nil # TODO: well organized counters @num_errors_metrics = nil @emit_count_metrics = nil @emit_records_metrics = nil @emit_size_metrics = nil @write_count_metrics = nil @rollback_count_metrics = nil @flush_time_count_metrics = nil @slow_flush_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) if implement?(:buffered) || implement?(:delayed_commit) @buffering = nil # do #configure or #start to determine this for full-featured plugins else @buffering = false end else @buffering = true end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @retry = nil @dequeued_chunks = nil @dequeued_chunks_mutex = nil @output_enqueue_thread = nil @output_flush_threads = nil @output_flush_thread_current_position = 0 @simple_chunking = nil @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil @retry_for_error_chunk = false end
Public Instance Methods
# File lib/fluent/plugin/output.rb, line 248 def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary @chunk_keys = @primary_instance.chunk_keys || [] @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end self.context_router = primary.context_router singleton_class.module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end
# File lib/fluent/plugin/output.rb, line 603 def actual_flush_thread_count return 0 unless @buffering return @buffer_config.flush_thread_count unless @as_secondary @primary_instance.buffer_config.flush_thread_count end
Fluent::Plugin::Base#after_shutdown
# File lib/fluent/plugin/output.rb, line 563 def after_shutdown try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary if @buffering && @buffer @buffer.after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| # to wakeup thread and make it to stop by itself state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass state.thread.join end end end super end
Fluent::Plugin::Base#after_start
# File lib/fluent/plugin/output.rb, line 525 def after_start super @secondary.after_start if @secondary end
# File lib/fluent/plugin/output.rb, line 1287 def backup_chunk(chunk, using_secondary, delayed_commit) if @buffer.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else @buffer.backup(chunk.unique_id) { |f| chunk.write_to(f) } end commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) end
Fluent::Plugin::Base#before_shutdown
# File lib/fluent/plugin/output.rb, line 537 def before_shutdown @secondary.before_shutdown if @secondary if @buffering && @buffer if @flush_at_shutdown force_flush end @buffer.before_shutdown # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data @output_enqueue_thread_running = false if @output_enqueue_thread && @output_enqueue_thread.alive? @output_enqueue_thread.wakeup @output_enqueue_thread.join end end super end
# File lib/fluent/plugin/output.rb, line 948 def calculate_timekey(time) time_int = time.to_i if @timekey_use_utc (time_int - (time_int % @timekey)).to_i else offset = @calculate_offset ? @calculate_offset.call(time) : @offset (time_int - ((time_int + offset)% @timekey)).to_i end end
# File lib/fluent/plugin/output.rb, line 1298 def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i @flush_time_count_metrics.add(elapsed_millsec) if elapsed_time > @slow_flush_log_threshold @slow_flush_count_metrics.inc log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end end
# File lib/fluent/plugin/output.rb, line 958 def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' m = metadata(tag, time, record) Fluent::Plugin::Buffer::MemoryChunk.new(m) end
Fluent::Plugin::Base#close
# File lib/fluent/plugin/output.rb, line 589 def close @buffer.close if @buffering && @buffer @secondary.close if @secondary super end
# File lib/fluent/plugin/output.rb, line 1102 def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed } if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end end @buffer.purge_chunk(chunk_id) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id) else log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end end end
Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/output.rb, line 266 def configure(conf) unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit) raise "BUG: output plugin must implement some methods. see developer documents." end has_buffer_section = (conf.elements(name: 'buffer').size > 0) has_flush_interval = conf.has_key?('flush_interval') super @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering" end @buffering = true else # no buffer sections if implement?(:synchronous) if !implement?(:buffered) && !implement?(:delayed_commit) if @as_secondary raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't." end @buffering = false else if @as_secondary # secondary plugin always works as buffered plugin without buffer instance @buffering = true else # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start @buffering = nil end end else # buffered or delayed_commit is supported by `unless` of first line in this method @buffering = true end end # Enable to update record size metrics or not @enable_size_metrics = !!system_config.enable_size_metrics if @as_secondary if !@buffering && !@buffering.nil? raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't" end end if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any? { |key| begin k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key) if k.is_a?(String) k !~ CHUNK_KEY_PATTERN else if key.start_with?('$[') raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed" else false end end rescue => e raise Fluent::ConfigError, "in chunk_keys: #{e.message}" end } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" else @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }] end if @chunk_key_time raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @timekey = @buffer_config.timekey if @timekey <= 0 raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}" end @timekey_use_utc = @buffer_config.timekey_use_utc @offset = Fluent::Timezone.utc_offset(@timekey_zone) @calculate_offset = @offset.respond_to?(:call) ? @offset : nil @output_time_formatter_cache = {} end if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM log.warn "many chunk keys specified, and it may cause too many chunks on your system." end # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? @flush_mode = @buffer_config.flush_mode if @flush_mode == :default if has_flush_interval log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour" @flush_mode = :interval else @flush_mode = (@chunk_key_time ? :lazy : :interval) end end buffer_type = @buffer_config[:@type] buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) keep_buffer_config_compat @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil? @flush_at_shutdown = if @buffer.persistent? false else true # flush_at_shutdown is true in default for on-memory buffer end elsif !@flush_at_shutdown && !@buffer.persistent? buf_type = Plugin.lookup_type_from_class(@buffer.class) log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer." log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again." end if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval') if buffer_conf.has_key?('flush_mode') raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'" else log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'" end end if @buffer.queued_chunks_limit_size.nil? @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count end end if @secondary_config raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary if @buffer_config.retry_forever log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary" end secondary_type = @secondary_config[:@type] unless secondary_type secondary_type = conf['@type'] # primary plugin type end secondary_conf = conf.elements(name: 'secondary').first @secondary = Plugin.new_output(secondary_type) unless @secondary.respond_to?(:acts_as_secondary) raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output" end @secondary.acts_as_secondary(self) @secondary.configure(secondary_conf) if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") && (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format)) log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s end else @secondary = nil end self end
# File lib/fluent/plugin/output.rb, line 894 def emit_buffered(tag, es) @emit_count_metrics.inc begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued?(nil, optimistic: true) submit_flush_once end rescue # TODO: separate number of errors into emit errors and write/flush errors @num_errors_metrics.inc raise end end
# File lib/fluent/plugin/output.rb, line 178 def emit_count @emit_count_metrics.get end
# File lib/fluent/plugin/output.rb, line 873 def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) else emit_sync(tag, es) end end
# File lib/fluent/plugin/output.rb, line 186 def emit_records @emit_records_metrics.get end
# File lib/fluent/plugin/output.rb, line 182 def emit_size @emit_size_metrics.get end
# File lib/fluent/plugin/output.rb, line 882 def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise end end
# File lib/fluent/plugin/output.rb, line 1445 def enqueue_thread_run value_for_interval = nil if @flush_mode == :interval value_for_interval = @buffer_config.flush_interval end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min end end unless value_for_interval raise "BUG: both of flush_interval and timekey are disabled" end interval = value_for_interval / 11.0 if interval < @buffer_config.flush_thread_interval interval = @buffer_config.flush_thread_interval end while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "enqueue_thread actually running" begin while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval next end @output_enqueue_thread_mutex.lock begin if @flush_mode == :interval flush_interval = @buffer_config.flush_interval.to_i # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. @buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int } end if @chunk_key_time timekey_unit = @buffer_config.timekey timekey_wait = @buffer_config.timekey_wait current_timekey = now_int - now_int % timekey_unit @buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int } end rescue => e raise if @under_plugin_development log.error "unexpected error while checking flushed chunks. ignored.", error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @output_enqueue_thread_mutex.unlock end sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. log.error "error on enqueue thread", error: e log.error_backtrace raise end end
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1421 def enqueue_thread_wait @output_enqueue_thread_mutex.synchronize do @output_flush_interrupted = false @output_enqueue_thread_waiting = true end require 'timeout' Timeout.timeout(10) do Thread.pass while @output_enqueue_thread_waiting end end
# File lib/fluent/plugin/output.rb, line 965 def execute_chunking(tag, es, enqueue: false) if @simple_chunking handle_stream_simple(tag, es, enqueue: enqueue) elsif @custom_format handle_stream_with_custom_format(tag, es, enqueue: enqueue) else handle_stream_with_standard_format(tag, es, enqueue: enqueue) end end
TODO: optimize this code
# File lib/fluent/plugin/output.rb, line 798 def extract_placeholders(str, chunk) metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk) chunk_passed = true chunk.metadata else chunk_passed = false # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk chunk end if metadata.empty? str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } else rvalue = str.dup # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(metadata.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]} if @chunk_key_tag if str.include?('${tag}') rvalue = rvalue.gsub('${tag}', metadata.tag) end if CHUNK_TAG_PLACEHOLDER_PATTERN.match?(str) hash = {} tag_parts = metadata.tag.split('.') tag_parts.each_with_index do |part, i| hash["${tag[#{i}]}"] = part hash["${tag[#{i-tag_parts.size}]}"] = part end rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}" end end # First we replace ${chunk_id} with chunk.unique_id (hexlified). rvalue = rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } # Then, replace other ${chunk_key}s. if !@chunk_keys.empty? && metadata.variables hash = {'${tag}' => '${tag}'} # not to erase this wrongly @chunk_keys.each do |key| hash["${#{key}}"] = metadata.variables[key.to_sym] end rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN) do |matched| hash.fetch(matched) do log.warn "chunk key placeholder '#{matched[2..-2]}' not replaced. template:#{str}" '' end end end if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}" end rvalue end end
# File lib/fluent/plugin/output.rb, line 1510 def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval state.next_clock = Fluent::Clock.now + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "flush_thread actually running" state.mutex.lock begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running current_clock = Fluent::Clock.now next_retry_time = nil @retry_mutex.synchronize do next_retry_time = @retry ? @retry.next_time : nil end if state.next_clock > current_clock interval = state.next_clock - current_clock elsif next_retry_time && next_retry_time > Time.now interval = next_retry_time.to_f - Time.now.to_f else state.mutex.unlock begin try_flush # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected # because @retry still exists (#commit_write is not called yet in #try_flush) # @retry should be cleared if delayed commit is enabled? Or any other solution? state.next_clock = Fluent::Clock.now + interval ensure state.mutex.lock end end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted state.mutex.unlock begin try_rollback_write ensure state.mutex.lock end end end state.cond_var.wait(state.mutex, interval) if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors log.error "error on output thread", error: e log.error_backtrace raise ensure state.mutex.unlock end end
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1433 def flush_thread_wakeup @output_flush_threads.each do |state| state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass end end
# File lib/fluent/plugin/output.rb, line 1401 def force_flush if @buffering @buffer.enqueue_all(true) submit_flush_all end end
# File lib/fluent/plugin/output.rb, line 126 def format(tag, time, record) # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass raise NotImplementedError, "BUG: output plugins MUST implement this method" end
Compatibility for existing plugins
# File lib/fluent/plugin/output.rb, line 138 def formatted_to_msgpack_binary formatted_to_msgpack_binary? end
# File lib/fluent/plugin/output.rb, line 131 def formatted_to_msgpack_binary? # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end
# File lib/fluent/plugin/output.rb, line 1021 def generate_format_proc if @buffer && @buffer.compress == :gzip @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end end
# File lib/fluent/plugin/output.rb, line 793 def get_placeholders_keys(str) str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map(&:first).reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end
-1 means whole tag
# File lib/fluent/plugin/output.rb, line 780 def get_placeholders_tag(str) # [["tag"],["tag[0]"]] parts = [] str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph| if ph == "tag" parts << -1 elsif ph =~ /^tag\[(-?\d+)\]$/ parts << $1.to_i end end parts.sort end
it’s not validated to use timekey larger than 1 day
# File lib/fluent/plugin/output.rb, line 770 def get_placeholders_time(str) base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str) TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple| sec = triple.first return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str end nil end
# File lib/fluent/plugin/output.rb, line 1355 def handle_limit_reached(error) if error records = @buffer.queued_records msg = "Hit limit for retries. dropping all chunks in the buffer queue." log.error msg, retry_times: @retry.steps, records: records, error: error log.error_backtrace error.backtrace end @buffer.clear_queue! log.debug "buffer queue cleared" @retry = nil end
# File lib/fluent/plugin/output.rb, line 1076 def handle_stream_simple(tag, es, enqueue: false) format_proc = nil meta = metadata((@chunk_key_tag ? tag : nil), nil, nil) records = es.size if @custom_format records = 0 data = [] es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| res = format(tag, time, record) if res data << res records += 1 end end else format_proc = generate_format_proc data = es end write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end
metadata_and_data is a Hash of:
(standard format) metadata => event stream (custom format) metadata => array of formatted event
For standard format, formatting should be done for whole event stream, but
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size. `@buffer.write` will do this splitting.
For custom format, formatting will be done here. Custom formatting always requires
iteration of event stream, and it should be done just once even if total event stream size is bigger than chunk_limit_size because of performance.
# File lib/fluent/plugin/output.rb, line 1038 def handle_stream_with_custom_format(tag, es, enqueue: false) meta_and_data = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] res = format(tag, time, record) if res meta_and_data[meta] << res records += 1 end end write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end
# File lib/fluent/plugin/output.rb, line 1058 def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc meta_and_data = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) records += 1 end write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end
# File lib/fluent/plugin/output.rb, line 651 def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) when :delayed_commit then methods_of_plugin.include?(:try_write) when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end end
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1416 def interrupt_flushes @output_flush_interrupted = true end
# File lib/fluent/plugin/output.rb, line 441 def keep_buffer_config_compat # Need this to call `@buffer_config.disable_chunk_backup` just as before, # since some plugins may use this option in this way. @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup end
# File lib/fluent/plugin/output.rb, line 1344 def log_retry_error(error, chunk_id_hex, using_secondary) return unless error if using_secondary msg = "failed to flush the buffer with secondary output." else msg = "failed to flush the buffer." end log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error) log.warn_backtrace(error.backtrace) end
TODO: optimize this code
# File lib/fluent/plugin/output.rb, line 909 def metadata(tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? # for tests return Struct.new(:timekey, :tag, :variables).new end # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.metadata() elsif @chunk_key_time && @chunk_key_tag timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey, tag: tag) elsif @chunk_key_time timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey) else @buffer.metadata(tag: tag) end else timekey = if @chunk_key_time calculate_timekey(time) else nil end pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }] @buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end
# File lib/fluent/plugin/output.rb, line 154 def multi_workers_ready? false end
# File lib/fluent/plugin/output.rb, line 1176 def next_flush_time if @buffer.queued? @retry_mutex.synchronize do @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval end else Time.now + @buffer_config.flush_thread_interval end end
# File lib/fluent/plugin/output.rb, line 174 def num_errors @num_errors_metrics.get end
# File lib/fluent/plugin/output.rb, line 663 def placeholder_validate!(name, str) placeholder_validators(name, str).each do |v| v.validate! end end
# File lib/fluent/plugin/output.rb, line 669 def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) validators = [] sec, title, example = get_placeholders_time(str) if sec || time_key validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key}) end parts = get_placeholders_tag(str) if tag_key || !parts.empty? validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key}) end keys = get_placeholders_keys(str) if chunk_keys && !chunk_keys.empty? || !keys.empty? validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys}) end validators end
# File lib/fluent/plugin/output.rb, line 142 def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods # * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified true end
# File lib/fluent/plugin/output.rb, line 149 def prefer_delayed_commit # override this method to decide which is used of `write` or `try_write` if both are implemented true end
# File lib/fluent/plugin/output.rb, line 114 def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
# File lib/fluent/plugin/output.rb, line 1367 def retry_state(randomize) if @secondary retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold, randomize: randomize ) else retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: randomize ) end end
# File lib/fluent/plugin/output.rb, line 194 def rollback_count @rollback_count_metrics.get end
update_retry parameter is for preventing busy loop by async write We will remove this parameter by re-design retry_state
management between threads.
# File lib/fluent/plugin/output.rb, line 1126 def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end # returns true if chunk was rollbacked as expected # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @rollback_count_metrics.inc if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) end true else false end end
Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/output.rb, line 556 def shutdown @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer super end
Fluent::Plugin::Base#start
# File lib/fluent/plugin/output.rb, line 447 def start super if @buffering.nil? @buffering = prefer_buffered_processing if !@buffering && @buffer @buffer.terminate # it's not started, so terminate will be enough # At here, this plugin works as non-buffered plugin. # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent) @buffer = nil end end if @buffering m = method(:emit_buffered) singleton_class.module_eval do define_method(:emit_events, m) end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout else # !@buffering m = method(:emit_sync) singleton_class.module_eval do define_method(:emit_events, m) end end if @buffering && !@as_secondary @retry = nil @retry_mutex = Mutex.new @buffer.start @output_enqueue_thread = nil @output_enqueue_thread_running = true @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true # mainly for test: detect enqueue works as code below: # @output.interrupt_flushes # # emits # @output.enqueue_thread_wait @output_flush_interrupted = false @output_enqueue_thread_mutex = Mutex.new @output_enqueue_thread_waiting = false @dequeued_chunks = [] @dequeued_chunks_mutex = Mutex.new @output_flush_thread_current_position = 0 @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) thread = thread_create(thread_title) do flush_thread_run(thread_state) end thread_state.thread = thread @output_flush_threads_mutex.synchronize do @output_flush_threads << thread_state end end if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time) @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end @secondary.start if @secondary end
# File lib/fluent/plugin/output.rb, line 1579 def statistics stats = { 'emit_records' => @emit_records_metrics.get, 'emit_size' => @emit_size_metrics.get, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 'retry_count' => @num_errors_metrics.get, 'emit_count' => @emit_count_metrics.get, 'write_count' => @write_count_metrics.get, 'rollback_count' => @rollback_count_metrics.get, 'slow_flush_count' => @slow_flush_count_metrics.get, 'flush_time_count' => @flush_time_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) (@buffer.statistics['buffer'] || {}).each do |k, v| stats[BUFFER_STATS_KEYS[k]] = v end end { 'output' => stats } end
Fluent::PluginId#stop
# File lib/fluent/plugin/output.rb, line 530 def stop @secondary.stop if @secondary @buffer.stop if @buffering && @buffer super end
# File lib/fluent/plugin/output.rb, line 1408 def submit_flush_all while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval end end
# File lib/fluent/plugin/output.rb, line 1386 def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] state.mutex.synchronize { if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) state.next_clock = 0 state.cond_var.signal else log.warn "thread is already dead" end } Thread.pass end
# File lib/fluent/plugin/output.rb, line 639 def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature when :synchronous then false when :buffered then false when :delayed_commit then false when :custom_format then false else raise ArgumentError, "unknown feature: #{feature}" end end
# File lib/fluent/plugin/output.rb, line 630 def synchronize_in_threads need_thread_lock = actual_flush_thread_count > 1 if need_thread_lock @flush_thread_mutex.synchronize { yield } else yield end end
Ensures ‘path` (filename or filepath) processable only by the current thread in the current process. For multiple workers, the lock is shared if `path` is the same value. For multiple threads, the lock is shared by all threads in the same process.
# File lib/fluent/plugin/output.rb, line 613 def synchronize_path(path) synchronize_path_in_workers(path) do synchronize_in_threads do yield end end end
# File lib/fluent/plugin/output.rb, line 621 def synchronize_path_in_workers(path) need_worker_lock = system_config.workers > 1 if need_worker_lock acquire_worker_lock(path) { yield } else yield end end
Fluent::PluginLoggerMixin#terminate
# File lib/fluent/plugin/output.rb, line 596 def terminate @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary super end
# File lib/fluent/plugin/output.rb, line 1188 def try_flush chunk = @buffer.dequeue_chunk return unless chunk log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) } output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } output = @secondary using_secondary = true end if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end begin chunk_write_start = Fluent::Clock.now if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @write_count_metrics.inc @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) end output.try_write(chunk) check_slow_flush(chunk_write_start) else # output plugin without delayed purge chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @write_count_metrics.inc log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) check_slow_flush(chunk_write_start) log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, delayed: false, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue *UNRECOVERABLE_ERRORS => e if @secondary if using_secondary log.warn "got unrecoverable error in secondary.", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else if (self.class == @secondary.class) log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else # Call secondary output directly without retry update. # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now. if @secondary.delayed_commit log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e log.warn_backtrace begin @secondary.write(chunk) commit_write(chunk_id, delayed: output.delayed_commit, secondary: true) rescue => e log.warn "got an error in secondary for unrecoverable error", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end end end end else log.warn "got unrecoverable error in primary and no secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end rescue => e log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id } end end if @buffer.takeback_chunk(chunk.unique_id) @rollback_count_metrics.inc end update_retry_state(chunk.unique_id, using_secondary, e) raise if @under_plugin_development && !@retry_for_error_chunk end end
# File lib/fluent/plugin/output.rb, line 1161 def try_rollback_all return unless @dequeued_chunks @dequeued_chunks_mutex.synchronize do until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end
# File lib/fluent/plugin/output.rb, line 1147 def try_rollback_write @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end
# File lib/fluent/plugin/output.rb, line 122 def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
# File lib/fluent/plugin/output.rb, line 1309 def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do @num_errors_metrics.inc chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @retry = retry_state(@buffer_config.retry_randomize) if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end return end # @retry exists # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when # @retry.step is called almost as many times as the number of flush threads in a short time. if Time.now >= @retry.next_time @retry.step else @retry.recalc_next_time # to prevent all flush threads from retrying at the same time end if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end end end
# File lib/fluent/plugin/output.rb, line 118 def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
# File lib/fluent/plugin/output.rb, line 190 def write_count @write_count_metrics.get end
# File lib/fluent/plugin/output.rb, line 975 def write_guard(&block) begin block.call rescue Fluent::Plugin::Buffer::BufferOverflowError log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action case @buffer_config.overflow_action when :throw_exception raise when :block log.debug "buffer.write is now blocking" until @buffer.storable? if self.stopped? log.error "breaking block behavior to shutdown Fluentd" # to break infinite loop to exit Fluentd process raise end log.trace "sleeping until buffer can store more data" sleep 1 end log.debug "retrying buffer.write after blocked operation" retry when :drop_oldest_chunk begin oldest = @buffer.dequeue_chunk if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id) @buffer.purge_chunk(oldest.unique_id) else log.error "no queued chunks to be dropped for drop_oldest_chunk" end rescue # ignore any errors end raise unless @buffer.storable? retry else raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'" end end end