# File lib/fluent/buffer.rb, line 128 def initialize super @map = nil # chunks to store data @queue = nil # chunks to be flushed @parallel_pop = true end
# File lib/fluent/buffer.rb, line 328 def clear! @queue.delete_if {|chunk| chunk.purge true } end
# File lib/fluent/buffer.rb, line 150 def configure(conf) super end
# File lib/fluent/buffer.rb, line 176 def emit(key, data, chain) key = key.to_s synchronize do # chunk unique id is generated in #new_chunk chunk = (@map[key] ||= new_chunk(key)) if storable?(chunk, data) chain.next chunk << data return false elsif @queue.size >= @buffer_queue_limit raise BufferQueueLimitError, "queue size exceeds limit" end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``" $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" $log.warn "in the forward output ``at the log forwarding server.``" ### TODO # raise BufferChunkLimitError, "received data too large" end # chunk unique id is generated in #new_chunk nc = new_chunk(key) ok = false begin nc << data chain.next flush_trigger = false @queue.synchronize { enqueue(chunk) # this is buffer enqueue *hook* flush_trigger = @queue.empty? @queue << chunk # actual enqueue @map[key] = nc } ok = true # false: queue have 1 or more chunks before this emit # so this enqueue is not a trigger to flush # true: queue have no chunks before this emit # so this enqueue is a trigger to flush this buffer ASAP return flush_trigger ensure nc.purge unless ok end end # synchronize end
# File lib/fluent/buffer.rb, line 135 def enable_parallel(b=true) @parallel_pop = b end
enqueueing is done by push this method is actually 'enqueue_hook'
# File lib/fluent/buffer.rb, line 263 def enqueue(chunk) raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/buffer.rb, line 230 def keys @map.keys end
# File lib/fluent/buffer.rb, line 253 def new_chunk(key) raise NotImplementedError, "Implement this method in child class" end
shift a chunk from queue, write and purge it returns boolean to indicate whether this buffer have more chunk to be flushed or not
# File lib/fluent/buffer.rb, line 287 def pop(out) chunk = nil @queue.synchronize do if @parallel_pop chunk = @queue.find {|c| c.try_mon_enter } return false unless chunk else chunk = @queue.first return false unless chunk return false unless chunk.try_mon_enter end end begin # #push(key) does not push empty chunks into queue. # so this check is nonsense... if !chunk.empty? write_chunk(chunk, out) end queue_empty = false @queue.synchronize do @queue.delete_if {|c| c.object_id == chunk.object_id } queue_empty = @queue.empty? end chunk.purge # return to be flushed once more immediately, or not return !queue_empty ensure chunk.mon_exit end end
get the chunk specified by key, and push it into queue
# File lib/fluent/buffer.rb, line 268 def push(key) synchronize do chunk = @map[key] if !chunk || chunk.empty? return false end @queue.synchronize do enqueue(chunk) @queue << chunk @map.delete(key) end return true end # synchronize end
# File lib/fluent/buffer.rb, line 234 def queue_size @queue.size end
# File lib/fluent/buffer.rb, line 257 def resume raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/buffer.rb, line 159 def shutdown synchronize do @queue.synchronize do until @queue.empty? @queue.shift.close end end @map.each_pair {|key,chunk| chunk.close } end end
# File lib/fluent/buffer.rb, line 154 def start @queue, @map = resume @queue.extend(MonitorMixin) end
# File lib/fluent/buffer.rb, line 172 def storable?(chunk, data) chunk.size + data.bytesize <= @buffer_chunk_limit end
# File lib/fluent/buffer.rb, line 238 def total_queued_chunk_size total = 0 synchronize { @map.each_value {|c| total += c.size } @queue.synchronize { @queue.each {|c| total += c.size } } } total end
# File lib/fluent/buffer.rb, line 324 def write_chunk(chunk, out) out.write(chunk) end