class Fluent::OutputThread

Public Class Methods

new(output) click to toggle source
# File lib/fluent/output.rb, line 107
def initialize(output)
  @output = output
  @finish = false
  @next_time = Time.now.to_f + 1.0
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/output.rb, line 113
def configure(conf)
end
shutdown() click to toggle source
# File lib/fluent/output.rb, line 122
def shutdown
  @finish = true
  @mutex.synchronize {
    @cond.signal
  }
  Thread.pass
  @thread.join
end
start() click to toggle source
# File lib/fluent/output.rb, line 116
def start
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @thread = Thread.new(&method(:run))
end
submit_flush() click to toggle source
# File lib/fluent/output.rb, line 131
def submit_flush
  @mutex.synchronize {
    @next_time = 0
    @cond.signal
  }
  Thread.pass
end

Private Instance Methods

cond_wait(sec) click to toggle source
# File lib/fluent/output.rb, line 173
def cond_wait(sec)
  @cond.wait(@mutex, sec)
end
run() click to toggle source
# File lib/fluent/output.rb, line 140
def run
  @mutex.lock
  begin
    until @finish
      time = Time.now.to_f

      if @next_time <= time
        @mutex.unlock
        begin
          @next_time = @output.try_flush
        ensure
          @mutex.lock
        end
        next_wait = @next_time - Time.now.to_f
      else
        next_wait = @next_time - time
      end

      cond_wait(next_wait) if next_wait > 0
    end
  ensure
    @mutex.unlock
  end
rescue
  $log.error "error on output thread", error: $!.to_s
  $log.error_backtrace
  raise
ensure
  @mutex.synchronize {
    @output.before_shutdown
  }
end