class RemoteService::Queue
Constants
- EXIT_SIGNALS
Public Instance Methods
connect(brokers, &block)
click to toggle source
# File lib/remote_service/queue.rb, line 11 def connect(brokers, &block) brokers ||= ENV.fetch('REMOTE_SERVICE_BROKERS', 'nats://127.0.0.1:4222').split(',') @conn = Connector::Nats.new(brokers) @conn.start(&block) end
publish(queue, payload)
click to toggle source
# File lib/remote_service/queue.rb, line 36 def publish(queue, payload) @conn.publish(queue, encode(payload)) end
request(queue, payload) { |data| ... }
click to toggle source
# File lib/remote_service/queue.rb, line 25 def request(queue, payload) RemoteService.logger.debug "REQUEST - SERVICE:[#{queue}] PAYLOAD:[#{payload}]" sent_at = Time.now.utc @conn.request(queue, encode(payload)) do |response| data = decode(response) response_time = (Time.now.utc - sent_at)*1000 RemoteService.logger.debug "RESPONSE - SERVICE:[#{queue}] PAYLOAD:[#{data}] TIME:[#{response_time}ms]" yield(data) end end
service(service_handler, workers, monitor_interval)
click to toggle source
# File lib/remote_service/queue.rb, line 17 def service(service_handler, workers, monitor_interval) workers ||= ENV.fetch('REMOTE_SERVICE_WORKERS', 4) monitor_interval ||= ENV.fetch('REMOTE_SERVICE_MONITOR_INTERVAL', 5) @worker_pool = WorkerPool.new(workers.to_i, monitor_interval.to_i) @service_handler = service_handler start_service_subscriber end
Private Instance Methods
decode(payload)
click to toggle source
# File lib/remote_service/queue.rb, line 74 def decode(payload) MessagePack.unpack(payload) end
encode(payload)
click to toggle source
# File lib/remote_service/queue.rb, line 70 def encode(payload) MessagePack.pack(payload) end
service?()
click to toggle source
# File lib/remote_service/queue.rb, line 62 def service? @service_handler != nil end
service_queue_name()
click to toggle source
# File lib/remote_service/queue.rb, line 66 def service_queue_name @service_handler.class.queue_name if @service_handler end
setup_signal_handlers()
click to toggle source
# File lib/remote_service/queue.rb, line 78 def setup_signal_handlers EXIT_SIGNALS.each do |sig| trap(sig) do @worker_pool.exit if service? @conn.exit if !service? EM.stop end end end
start_service_subscriber()
click to toggle source
# File lib/remote_service/queue.rb, line 42 def start_service_subscriber RemoteService.logger.debug "SERVICE QUEUE: #{service_queue_name}" @worker_pool.start @conn.subscribe(service_queue_name) do |*args| @worker_pool.run(*args) do |request, reply_to| begin payload = decode(request) RemoteService.logger.debug "FETCHED - REPLY_TO:[#{reply_to}] PAYLOAD:[#{payload}]" @service_handler.handle(payload, reply_to) rescue => e RemoteService.logger.error(e) Queue.instance.publish( reply_to, {result: nil, error: {name: e.class.name, message: e.message, backtrace: e.backtrace}}, ) end end end end