class ActivePublisher::Async::RedisAdapter::Consumer

Constants

SUPERVISOR_INTERVAL

Attributes

consumers[R]
queue[R]
supervisor[R]

Public Class Methods

new(redis_pool) click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 14
def initialize(redis_pool)
  @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
  @consumers = {}
  create_and_supervise_consumers!
end

Public Instance Methods

create_and_supervise_consumers!() click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 20
def create_and_supervise_consumers!
  ::ActivePublisher.configuration.publisher_threads.times do
    consumer_id = ::SecureRandom.uuid
    consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

    supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
      consumer = consumers[consumer_id]
      unless consumer.alive?
        consumer.kill rescue nil
        consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
        ::ActiveSupport::Notifications.instrument "async_queue.thread_restart"
      end

      # Notify the current queue size.
      ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
    end

    supervisor_task.execute
  end
end
size() click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 41
def size
  queue.size
end