class Asynk::Server
Public Class Methods
new()
click to toggle source
# File lib/asynk/server.rb, line 5 def initialize end
Public Instance Methods
run()
click to toggle source
# File lib/asynk/server.rb, line 8 def run require 'asynk/worker' prepare_consumers register_signal_handlers Asynk.logger.info "All consumers are prepared" handle_signals # handle_signals end
shutdown()
click to toggle source
# File lib/asynk/server.rb, line 17 def shutdown futures = workers.map { |w| w.future.shutdown } futures.map(&:value) Asynk.broker.amqp_connection.close Asynk.logger.info "Server shutdown!" end
Private Instance Methods
handle_signals()
click to toggle source
# File lib/asynk/server.rb, line 25 def handle_signals loop do signal = Thread.main[:signal_queue].shift if signal Asynk.logger.info "Caught sig#{signal.downcase}, stopping asynk server..." shutdown break end sleep(0.1) end end
prepare_consumer(consumer)
click to toggle source
# File lib/asynk/server.rb, line 56 def prepare_consumer(consumer) consumer.concurrency.times do |index| workers << Asynk::Worker.new(Asynk.broker.amqp_connection, consumer, index) end end
prepare_consumers()
click to toggle source
# File lib/asynk/server.rb, line 52 def prepare_consumers Asynk.consumers.each{ |consumer| prepare_consumer(consumer) } end
register_signal_handlers()
click to toggle source
# File lib/asynk/server.rb, line 41 def register_signal_handlers Thread.main[:signal_queue] = [] %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig| # This needs to be reentrant, so we queue up signals to be handled # in the run loop, rather than acting on signals here trap(sig) do Thread.main[:signal_queue] << sig end end end
workers()
click to toggle source
# File lib/asynk/server.rb, line 37 def workers @workers ||= [] end