class ActivePublisher::Async::RedisAdapter::Consumer
Constants
- SUPERVISOR_INTERVAL
Attributes
consumers[R]
queue[R]
supervisor[R]
Public Class Methods
new(redis_pool)
click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 14 def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) @consumers = {} create_and_supervise_consumers! end
Public Instance Methods
create_and_supervise_consumers!()
click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 20 def create_and_supervise_consumers! ::ActivePublisher.configuration.publisher_threads.times do consumer_id = ::SecureRandom.uuid consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do consumer = consumers[consumer_id] unless consumer.alive? consumer.kill rescue nil consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) ::ActiveSupport::Notifications.instrument "async_queue.thread_restart" end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end end
size()
click to toggle source
# File lib/active_publisher/async/redis_adapter/consumer.rb, line 41 def size queue.size end