class ActivePublisher::Async::InMemoryAdapter::AsyncQueue
Constants
- BACK_PRESSURE_STRATEGIES
These strategies are used to determine what to do with messages when the queue is full. :raise - Raise an error and drop the message. :drop - Silently drop the message. :wait - Wait for space in the queue to become available.
Attributes
back_pressure_strategy[RW]
consumers[R]
max_queue_size[RW]
queue[R]
supervisor[R]
supervisor_interval[RW]
Public Class Methods
new(back_pressure_strategy, max_queue_size, supervisor_interval)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 19 def initialize(back_pressure_strategy, max_queue_size, supervisor_interval) self.back_pressure_strategy = back_pressure_strategy @max_queue_size = max_queue_size @supervisor_interval = supervisor_interval @queue = ::MultiOpQueue::Queue.new @consumers = {} create_and_supervise_consumers! end
Public Instance Methods
back_pressure_strategy=(strategy)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 28 def back_pressure_strategy=(strategy) fail ::ArgumentError, "Invalid back pressure strategy: #{strategy}" unless BACK_PRESSURE_STRATEGIES.include?(strategy) @back_pressure_strategy = strategy end
push(message)
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 33 def push(message) if queue.size >= max_queue_size case back_pressure_strategy when :drop ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher" return when :raise ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher" fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped." when :wait ::ActiveSupport::Notifications.instrument "wait_for_async_queue.active_publisher" do # This is a really crappy way to wait sleep 0.01 until queue.size < max_queue_size end end end queue.push(message) end
size()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 53 def size # Requests might be in flight (out of the queue, but not yet published), so taking the max should be # good enough to make sure we're honest about the actual queue size. return queue.size if consumers.empty? [queue.size, consumer_sampled_queue_size].max end
Private Instance Methods
consumer_sampled_queue_size()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 89 def consumer_sampled_queue_size consumers.values.map(&:sampled_queue_size).max end
create_and_supervise_consumers!()
click to toggle source
# File lib/active_publisher/async/in_memory_adapter/async_queue.rb, line 62 def create_and_supervise_consumers! ::ActivePublisher.configuration.publisher_threads.times do consumer_id = ::SecureRandom.uuid consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(:execution_interval => supervisor_interval) do current_time = ::Time.now consumer = consumers[consumer_id] # Consumer is lagging if it does not "tick" at least once every 10 seconds. seconds_since_last_tick = current_time - consumer.last_tick_at consumer_is_lagging = seconds_since_last_tick > ::ActivePublisher.configuration.max_async_publisher_lag_time logger.error "ActivePublisher consumer is lagging. Last consumer tick was #{seconds_since_last_tick} seconds ago." if consumer_is_lagging # Check to see if we should restart the consumer. if !consumer.alive? || consumer_is_lagging consumer.kill rescue nil consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) ::ActiveSupport::Notifications.instrument "async_queue.thread_restart" end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "async_queue_size.active_publisher", queue.size end supervisor_task.execute end end