class Flume::LogDevice
Public Class Methods
new(*args, &block)
click to toggle source
# File lib/flume/log_device.rb, line 9 def initialize(*args, &block) options = args.last.is_a?(Hash) ? args.pop : {} @config = OpenStruct.new(options) block.call(@config) if block @redis = @config.redis || proc { Redis.new } @cap = @config.cap || (2 ** 16) @step = @config.step || 0 @cycle = @config.cycle || (2 ** 8) @list = @config.list || 'flume:log' end
Public Instance Methods
channel()
click to toggle source
# File lib/flume/log_device.rb, line 22 def channel "flume:#{list}" end
close()
click to toggle source
# File lib/flume/log_device.rb, line 44 def close redis.quit rescue nil end
size()
click to toggle source
# File lib/flume/log_device.rb, line 70 def size redis.llen(list) end
tail(n = 80)
click to toggle source
# File lib/flume/log_device.rb, line 48 def tail(n = 80) redis.lrange(list, 0, n - 1).reverse end
tailf(&block)
click to toggle source
# File lib/flume/log_device.rb, line 52 def tailf(&block) begin redis.subscribe(channel) do |on| on.message do |channel, message| block.call(message) end end rescue Redis::BaseConnectionError => error puts "#{error}, retrying in 1s" sleep 1 retry end end
truncate(n)
click to toggle source
# File lib/flume/log_device.rb, line 66 def truncate(n) redis.ltrim(list, 0, n - 1) end
write(message)
click to toggle source
# File lib/flume/log_device.rb, line 26 def write(message) begin redis.lpush(list, message) rescue Object => e error = "#{ e.message } (#{ e.class })\n#{ Array(e.backtrace).join(10.chr) }" STDERR.puts(error) STDERR.puts(message) end ensure redis.publish(channel, message) if (step % cycle).zero? truncate(cap) rescue nil end self.step = (step + 1) % cycle end