class LogStash::Outputs::Cassandra::Buffer

Public Class Methods

new(logger, max_size, flush_interval, &block) click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 7
def initialize(logger, max_size, flush_interval, &block)
  @logger = logger
  # You need to aquire this for anything modifying state generally
  @operations_mutex = Mutex.new
  @operations_lock = java.util.concurrent.locks.ReentrantLock.new

  @stopping = Concurrent::AtomicBoolean.new(false)
  @max_size = max_size
  @submit_proc = block

  @buffer = []

  @last_flush = Time.now
  @flush_interval = flush_interval
  @flush_thread = spawn_interval_flusher
end

Public Instance Methods

<<(item)
Alias for: push
contents() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 56
def contents
  synchronize {|buffer| buffer}
end
flush() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 39
def flush
  synchronize { flush_unsafe }
end
push(item) click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 24
def push(item)
  synchronize do |buffer|
    push_unsafe(item)
  end
end
Also aliased as: <<
push_multi(items) click to toggle source

Push multiple items onto the buffer in a single operation

# File lib/logstash/outputs/cassandra/buffer.rb, line 32
def push_multi(items)
  raise ArgumentError, "push multi takes an array!, not an #{items.class}!" unless items.is_a?(Array)
  synchronize do |buffer|
    items.each {|item| push_unsafe(item) }
  end
end
stop(do_flush=true,wait_complete=true) click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 43
def stop(do_flush=true,wait_complete=true)
  return if stopping?
  @stopping.make_true

  # No need to acquire a lock in this case
  return if !do_flush && !wait_complete

  synchronize do
    flush_unsafe if do_flush
    @flush_thread.join if wait_complete
  end
end
synchronize() { |buffer| ... } click to toggle source

For externally operating on the buffer contents this takes a block and will yield the internal buffer and executes the block in a synchronized block from the internal mutex

# File lib/logstash/outputs/cassandra/buffer.rb, line 63
def synchronize
  @operations_mutex.synchronize { yield(@buffer) }
end

Private Instance Methods

flush_unsafe() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 108
def flush_unsafe
  if @buffer.size > 0
    @submit_proc.call(@buffer)
    @buffer.clear
  end

  @last_flush = Time.now # This must always be set to ensure correct timer behavior
end
interval_flush() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 88
def interval_flush
  if last_flush_seconds_ago >= @flush_interval
    begin
      @logger.debug? && @logger.debug("Flushing buffer at interval",
                                    :instance => self.inspect,
                                    :interval => @flush_interval)
      flush_unsafe
    rescue StandardError => e
      @logger.warn("Error flushing buffer at interval!",
                   :instance => self.inspect,
                   :message => e.message,
                   :class => e.class.name,
                   :backtrace => e.backtrace
      )
    rescue Exception => e
      @logger.warn("Exception flushing buffer at interval!", :error => e.message, :class => e.class.name)
    end
  end
end
last_flush_seconds_ago() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 117
def last_flush_seconds_ago
  Time.now - @last_flush
end
push_unsafe(item) click to toggle source

These methods are private for various reasons, chief among them threadsafety! Many require the @operations_mutex to be locked to be safe

# File lib/logstash/outputs/cassandra/buffer.rb, line 71
def push_unsafe(item)
  @buffer << item
  if @buffer.size >= @max_size
    flush_unsafe
  end
end
spawn_interval_flusher() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 78
def spawn_interval_flusher
  Thread.new do
    loop do
      sleep 0.2
      break if stopping?
      synchronize { interval_flush }
    end
  end
end
stopping?() click to toggle source
# File lib/logstash/outputs/cassandra/buffer.rb, line 121
def stopping?
  @stopping.true?
end