class Atatus::Transport::Base
@api private
Constants
- WATCHER_EXECUTION_INTERVAL
- WATCHER_TIMEOUT_INTERVAL
- WORKER_JOIN_TIMEOUT
Attributes
config[R]
filters[R]
queue[R]
stopped[R]
watcher[R]
workers[R]
Public Class Methods
new(config)
click to toggle source
# File lib/atatus/transport/base.rb, line 41 def initialize(config) @config = config @queue = SizedQueue.new(config.api_buffer_size) @serializers = Serializers.new(config) @filters = Filters.new(config) @stopped = Concurrent::AtomicBoolean.new @workers = Array.new(config.pool_size) @worker_mutex = Mutex.new end
Public Instance Methods
add_filter(key, callback)
click to toggle source
# File lib/atatus/transport/base.rb, line 94 def add_filter(key, callback) @filters.add(key, callback) end
handle_forking!()
click to toggle source
# File lib/atatus/transport/base.rb, line 98 def handle_forking! # We can't just stop and start again because the StopMessage # will then be the first message processed when the transport is # restarted. stop_watcher ensure_worker_count create_watcher end
start()
click to toggle source
# File lib/atatus/transport/base.rb, line 56 def start debug '%s: Starting Transport', pid_str # Set @stopped to false first, in case transport is restarted; # ensure_worker_count requires @stopped to be false # ~estolfo @stopped.make_false unless @stopped.false? ensure_worker_count create_watcher end
stop()
click to toggle source
# File lib/atatus/transport/base.rb, line 67 def stop debug '%s: Stopping Transport', pid_str @stopped.make_true stop_watcher stop_workers end
submit(resource)
click to toggle source
# File lib/atatus/transport/base.rb, line 76 def submit(resource) if @stopped.true? warn '%s: Transport stopping, no new events accepted', pid_str debug 'Dropping: %s', resource.inspect return false end queue.push(resource, true) true rescue ThreadError throttled_queue_full_warning nil rescue Exception => e error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect nil end
Private Instance Methods
all_workers_alive?()
click to toggle source
# File lib/atatus/transport/base.rb, line 133 def all_workers_alive? !!workers.all? { |t| t&.alive? } end
boot_worker()
click to toggle source
# File lib/atatus/transport/base.rb, line 137 def boot_worker debug '%s: Booting worker...', pid_str Thread.new do Worker.new( config, queue, serializers: @serializers, filters: @filters ).work_forever end end
create_watcher()
click to toggle source
# File lib/atatus/transport/base.rb, line 113 def create_watcher @watcher = Concurrent::TimerTask.execute( execution_interval: WATCHER_EXECUTION_INTERVAL, timeout_interval: WATCHER_TIMEOUT_INTERVAL ) { ensure_worker_count } end
ensure_worker_count()
click to toggle source
# File lib/atatus/transport/base.rb, line 120 def ensure_worker_count @worker_mutex.synchronize do return if all_workers_alive? return if stopped.true? @workers.map! do |thread| next thread if thread&.alive? boot_worker end end end
pid_str()
click to toggle source
# File lib/atatus/transport/base.rb, line 109 def pid_str format('[PID:%s]', Process.pid) end
send_stop_messages()
click to toggle source
# File lib/atatus/transport/base.rb, line 171 def send_stop_messages config.pool_size.times { queue.push(Worker::StopMessage.new, true) } rescue ThreadError warn 'Cannot push stop messages to worker queue as it is full' end
stop_watcher()
click to toggle source
# File lib/atatus/transport/base.rb, line 177 def stop_watcher watcher&.shutdown end
stop_workers()
click to toggle source
# File lib/atatus/transport/base.rb, line 149 def stop_workers debug '%s: Stopping workers', pid_str send_stop_messages @worker_mutex.synchronize do workers.each do |thread| next if thread.nil? next if thread.join(WORKER_JOIN_TIMEOUT) debug( '%s: Worker did not stop in %ds, killing...', pid_str, WORKER_JOIN_TIMEOUT ) thread.kill end # Maintain the @worker array size for when transport is restarted @workers.fill(nil) end end
throttled_queue_full_warning()
click to toggle source
# File lib/atatus/transport/base.rb, line 181 def throttled_queue_full_warning (@queue_full_log ||= Util::Throttle.new(5) do warn( '%s: Queue is full (%i items), skipping…', pid_str, config.api_buffer_size ) end).call end