class Fluent::Plugin::RedisListPollerInput

Public Class Methods

new() click to toggle source

Initialize new input plugin @since 0.1.0 @return [NilClass]

Calls superclass method Fluent::PluginMixin::Redis::new
# File lib/fluent/plugin/in_redis_list_source.rb, line 32
def initialize
  super
  require 'msgpack'
end

Public Instance Methods

action_locking_monitor() click to toggle source

Action to execute when polling for the lock key @since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 186
def action_locking_monitor
  lock_value = @redis.get(@lock_key)
  @storage.put(@lock_key, lock_value)
end
action_poll() click to toggle source

Action to execute when the poller event watcher executes

Given that the watcher is pretty lightweight, we simply return if the worker has been set to sleep instead of actually sleeping. Doing otherwise seemed to cause locking.

Otherwise we iterate through messages, parse and emit them.

@since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 201
def action_poll
  now = Engine.now
  messages = []

  if sleeping?
    log.trace "redis worker is sleeping"
    return
  end

  if locked?
    log.trace "redis queue is locked"
    return
  end

  poll_messages do |message|
    if message.nil?
      log.debug "redis queue is empty"
      sleep!(@sleep_interval)
      break
    end

    @parser.parse(message) do |time, record|
      if time && record
        router.emit @tag || @key, time || Engine.now, record
      else
        log.warn "failed to parse message: #{message}"
      end
    end
  end
rescue => e
  log.error "error fetching record", :error => e
  log.error_backtrace
  sleep!(@retry_interval)
end
batched?() click to toggle source

Whether to fetch a single item or a multiple items in batch @since 0.1.0 @return [TrueClass, FalseClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 133
def batched?
  @batch_size and @batch_size > 1
end
configure(config) click to toggle source

Initialize attributes and parameters @since 0.1.0 @return [NilClass]

Calls superclass method
# File lib/fluent/plugin/in_redis_list_source.rb, line 40
def configure(config)
  super

  configure_params(config)
  configure_parser(config)
  configure_locking(config)

  @retry_at     = nil
end
configure_locking(config) click to toggle source

Configure locking @since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 77
def configure_locking(config)
  @storage  = storage_create(type: 'local')
  @lock_key = "fluentd:#{@key}:lock"
end
configure_params(config) click to toggle source

Configure plugin parameters @since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 53
def configure_params(config)
  %w(host port key command tag).each do |key|
    next if instance_variable_get("@#{key}")
    raise Fluent::ConfigError, "configuration key missing: #{key}"
  end

  unless %w(lpop rpop).include?(@command)
    raise Fluent::ConfigError, "command must be either lpop or rpop"
  end
end
configure_parser(config) click to toggle source

Configure record parser @since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 67
def configure_parser(config)
  parser_config = @parse.corresponding_config_element
  parser_type = parser_config['@type']
  @parser = Fluent::Plugin.new_parser(parser_type, :parent => self)
  @parser.configure(parser_config)
end
locked?() click to toggle source

Whether the poller has been locked @since 0.1.0 @return [TrueClass, FalseClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 148
def locked?
  @storage.get(@lock_key)
end
poll_messages() { |is_a?(Future) ? value : command| ... } click to toggle source

Poll messages from the redis server in either single message or batch mode. @since 0.1.0 @param [&block] the block to yield single messages to @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 165
def poll_messages
  commands = []

  if batched?
    @redis.pipelined do
      @batch_size.times do
        commands << @redis.call(@command, @key)
      end
    end
  else
    commands << @redis.call(@command, @key)
  end

  commands.each do |command|
    yield command.is_a?(Redis::Future) ? command.value : command
  end
end
shutdown() click to toggle source

Tear down the plugin @since 0.1.0 @return [NilClass]

Calls superclass method
# File lib/fluent/plugin/in_redis_list_source.rb, line 125
def shutdown
  super
  shutdown_redis
end
sleep!(delay = @sleep_interval) click to toggle source

Set a sleep delay, ensuring that we will not attempt to fetch messages @since 0.1.0 @param [Integer] delay, the amount of seconds to wait @return [Integer] timestamp when this expires

# File lib/fluent/plugin/in_redis_list_source.rb, line 156
def sleep!(delay = @sleep_interval)
  @retry_at = Engine.now + delay
end
sleeping?() click to toggle source

Wether the poller has been temporarily disabled or should fetch messages been temporarily disabled @since 0.1.0 @return [TrueClass, FalseClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 141
def sleeping?
  @retry_at and @retry_at >= Engine.now
end
start() click to toggle source

Prepare the plugin event loop

This method will initialize the Redis connection object, create any required Redis structures as well as define and begin the event pollers.

@since 0.1.0 @return [NilClass]

Calls superclass method
# File lib/fluent/plugin/in_redis_list_source.rb, line 89
def start
  super
  start_redis
  start_poller
  start_monitor
end
start_monitor() click to toggle source

Prepare the Redis queue monitor

This timed event will routinely poll for a lock key and disable the queue poller if required

@since 0.1.1 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 116
def start_monitor
  timer_execute(:monitor, 1) do
    action_locking_monitor
  end
end
start_poller() click to toggle source

Prepare the Redis queue poller

This timed event will routinely poll items from the Redis list and emit those through the pipeline.

@since 0.1.0 @return [NilClass]

# File lib/fluent/plugin/in_redis_list_source.rb, line 103
def start_poller
  timer_execute(:poller, @poll_interval) do
    action_poll
  end
end