class ActivePublisher::Async::RedisAdapter::Adapter

Constants

SUPERVISOR_INTERVAL

Attributes

async_queue[R]
flush_max[R]
flush_min[R]
queue[R]
redis_pool[R]

Public Class Methods

new(new_redis_pool) click to toggle source
# File lib/active_publisher/async/redis_adapter.rb, line 24
def initialize(new_redis_pool)
  logger.info "Starting redis publisher adapter"
  # do something with supervision ?
  @redis_pool = new_redis_pool
  @async_queue = ::ActivePublisher::Async::RedisAdapter::Consumer.new(redis_pool)
  @queue = ::MultiOpQueue::Queue.new
  @flush_max = ::ActivePublisher.configuration.messages_per_batch
  @flush_min = @flush_max / 2

  supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
    queue_size = queue.size
    number_of_times = [queue_size / 50, 1].max # get the max number of times to flush
    number_of_times = [number_of_times, 5].min # don't allow it to be more than 5 per run
    number_of_times.times { flush_queue! }
  end

  supervisor_task.execute
end

Public Instance Methods

publish(route, payload, exchange_name, options = {}) click to toggle source
# File lib/active_publisher/async/redis_adapter.rb, line 43
def publish(route, payload, exchange_name, options = {})
  message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
  queue << ::Marshal.dump(message)
  flush_queue! if queue.size >= flush_min || options[:flush_queue]

  nil
end
shutdown!() click to toggle source
# File lib/active_publisher/async/redis_adapter.rb, line 51
def shutdown!
  logger.info "Draining async publisher redis adapter before shutdown."
  flush_queue! until queue.empty?
  # Sleeping 2.1 seconds because the most common redis `fsync` command in AOF mode is run every 1 second
  # this will give at least 1 full `fsync` to run before the process dies
  sleep 2.1
end

Private Instance Methods

flush_queue!() click to toggle source
# File lib/active_publisher/async/redis_adapter.rb, line 61
def flush_queue!
  return if queue.empty?
  encoded_messages = queue.pop_up_to(flush_max, :timeout => 0.001)

  return if encoded_messages.nil?
  return unless encoded_messages.respond_to?(:each)
  return unless encoded_messages.size > 0

  redis_pool.with do |redis|
    redis.rpush(::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY, encoded_messages)
  end
end