class Asynk::Server

Public Class Methods

new() click to toggle source
# File lib/asynk/server.rb, line 5
def initialize
end

Public Instance Methods

run() click to toggle source
# File lib/asynk/server.rb, line 8
def run
  require 'asynk/worker'
  prepare_consumers
  register_signal_handlers
  Asynk.logger.info "All consumers are prepared"
  handle_signals
  # handle_signals
end
shutdown() click to toggle source
# File lib/asynk/server.rb, line 17
def shutdown
  futures = workers.map { |w| w.future.shutdown }
  futures.map(&:value)
  Asynk.broker.amqp_connection.close
  Asynk.logger.info "Server shutdown!"
end

Private Instance Methods

handle_signals() click to toggle source
# File lib/asynk/server.rb, line 25
def handle_signals
  loop do
    signal = Thread.main[:signal_queue].shift
    if signal
      Asynk.logger.info "Caught sig#{signal.downcase}, stopping asynk server..."
      shutdown
      break
    end
    sleep(0.1)
  end
end
prepare_consumer(consumer) click to toggle source
# File lib/asynk/server.rb, line 56
def prepare_consumer(consumer)
  consumer.concurrency.times do |index|
    workers << Asynk::Worker.new(Asynk.broker.amqp_connection, consumer, index)
  end
end
prepare_consumers() click to toggle source
# File lib/asynk/server.rb, line 52
def prepare_consumers
  Asynk.consumers.each{ |consumer| prepare_consumer(consumer) }
end
register_signal_handlers() click to toggle source
# File lib/asynk/server.rb, line 41
def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end
workers() click to toggle source
# File lib/asynk/server.rb, line 37
def workers
  @workers ||= []
end