class Flume::LogDevice

Public Class Methods

new(*args, &block) click to toggle source
# File lib/flume/log_device.rb, line 9
def initialize(*args, &block)
  options = args.last.is_a?(Hash) ? args.pop : {}

  @config = OpenStruct.new(options)
  block.call(@config) if block

  @redis = @config.redis || proc { Redis.new }
  @cap   = @config.cap   || (2 ** 16)
  @step  = @config.step  || 0
  @cycle = @config.cycle || (2 ** 8)
  @list  = @config.list  || 'flume:log'
end

Public Instance Methods

channel() click to toggle source
# File lib/flume/log_device.rb, line 22
def channel
  "flume:#{list}"
end
close() click to toggle source
# File lib/flume/log_device.rb, line 44
def close
  redis.quit rescue nil
end
size() click to toggle source
# File lib/flume/log_device.rb, line 70
def size
  redis.llen(list)
end
tail(n = 80) click to toggle source
# File lib/flume/log_device.rb, line 48
def tail(n = 80)
  redis.lrange(list, 0, n - 1).reverse
end
tailf(&block) click to toggle source
# File lib/flume/log_device.rb, line 52
def tailf(&block)
  begin
    redis.subscribe(channel) do |on|
      on.message do |channel, message|
        block.call(message)
      end
    end
  rescue Redis::BaseConnectionError => error
    puts "#{error}, retrying in 1s"
    sleep 1
    retry
  end
end
truncate(n) click to toggle source
# File lib/flume/log_device.rb, line 66
def truncate(n)
  redis.ltrim(list, 0, n - 1)
end
write(message) click to toggle source
# File lib/flume/log_device.rb, line 26
def write(message)
  begin
    redis.lpush(list, message)
  rescue Object => e
    error = "#{ e.message } (#{ e.class })\n#{ Array(e.backtrace).join(10.chr) }"
    STDERR.puts(error)
    STDERR.puts(message)
  end
ensure
  redis.publish(channel, message)

  if (step % cycle).zero?
    truncate(cap) rescue nil
  end

  self.step = (step + 1) % cycle
end