class HonestPubsub::Server::SubscriberServer
Public Class Methods
new(subscribers)
click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 20 def initialize(subscribers) @workers = subscribers.map { |subscriber| create_queue_listeners(subscriber) } end
Public Instance Methods
start()
click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 24 def start @workers.each do |worker| puts "Starting worker: #{worker.worker_class.name}" worker.start end thread = Thread.current interrupts = ["HUP", "INT", "QUIT", "ABRT", "TERM"] interrupts.each do |signal_name| Signal.trap(signal_name) { puts "Processing #{signal_name}" thread.run } end Thread.stop ::HonestPubsub::Logger.new.log_service("all_services", :warn, "Starting shutdown of all services") @workers.each do |worker| ::HonestPubsub::Logger.new.log_service("all_services", :warn, "Tearing down worker: #{worker.worker_class.name}") begin STDOUT.puts "Tearing down subscriber for #{worker.worker_class.name}" worker.shutdown rescue => e ::HonestPubsub::Logger.new.log_service("all_services", :warn, "#{worker.worker_class.name} - did not tear down correctly. Error - #{e.message}") end end ensure HonestPubsub::CLI.instance.remove_pid end
Private Instance Methods
create_queue_listeners(subscriber)
click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 66 def create_queue_listeners(subscriber) routing_key = subscriber.subscribed_key subscribed_queue_name = subscriber.subscribed_queue if routing_key.blank? raise ArgumentError.new("Routing key must be provided in #{subscriber.name} using `subscribe_to routing_key`") end if subscribed_queue_name.blank? raise ArgumentError.new("Queue Name must be provided in #{subscriber.name} using `subscribe_to routing_key, on: queue_name`") end STDOUT.puts "Setting up listener for request_key: #{routing_key} and queue:#{subscribed_queue_name}" ClientQueueListener.new(subscriber, routing_key, subscribed_queue_name) end
warn(message)
click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 58 def warn(message) old_behavior = ActiveSupport::Deprecation.behavior ActiveSupport::Deprecation.behavior = [:stderr, :log] ActiveSupport::Deprecation.warn(message) ensure ActiveSupport::Deprecation.behavior = old_behavior end