class RemoteService::Queue

Constants

EXIT_SIGNALS

Public Instance Methods

connect(brokers, &block) click to toggle source
# File lib/remote_service/queue.rb, line 11
def connect(brokers, &block)
  brokers ||= ENV.fetch('REMOTE_SERVICE_BROKERS', 'nats://127.0.0.1:4222').split(',')
  @conn = Connector::Nats.new(brokers)
  @conn.start(&block)
end
publish(queue, payload) click to toggle source
# File lib/remote_service/queue.rb, line 36
def publish(queue, payload)
  @conn.publish(queue, encode(payload))
end
request(queue, payload) { |data| ... } click to toggle source
# File lib/remote_service/queue.rb, line 25
def request(queue, payload)
  RemoteService.logger.debug "REQUEST - SERVICE:[#{queue}] PAYLOAD:[#{payload}]"
  sent_at = Time.now.utc
  @conn.request(queue, encode(payload)) do |response|
    data = decode(response)
    response_time = (Time.now.utc - sent_at)*1000
    RemoteService.logger.debug "RESPONSE - SERVICE:[#{queue}] PAYLOAD:[#{data}] TIME:[#{response_time}ms]"
    yield(data)
  end
end
service(service_handler, workers, monitor_interval) click to toggle source
# File lib/remote_service/queue.rb, line 17
def service(service_handler, workers, monitor_interval)
  workers ||= ENV.fetch('REMOTE_SERVICE_WORKERS', 4)
  monitor_interval ||= ENV.fetch('REMOTE_SERVICE_MONITOR_INTERVAL', 5)
  @worker_pool = WorkerPool.new(workers.to_i, monitor_interval.to_i)
  @service_handler = service_handler
  start_service_subscriber
end

Private Instance Methods

decode(payload) click to toggle source
# File lib/remote_service/queue.rb, line 74
def decode(payload)
  MessagePack.unpack(payload)
end
encode(payload) click to toggle source
# File lib/remote_service/queue.rb, line 70
def encode(payload)
  MessagePack.pack(payload)
end
service?() click to toggle source
# File lib/remote_service/queue.rb, line 62
def service?
  @service_handler != nil
end
service_queue_name() click to toggle source
# File lib/remote_service/queue.rb, line 66
def service_queue_name
  @service_handler.class.queue_name if @service_handler
end
setup_signal_handlers() click to toggle source
# File lib/remote_service/queue.rb, line 78
def setup_signal_handlers
  EXIT_SIGNALS.each do |sig|
    trap(sig) do
      @worker_pool.exit if service?
      @conn.exit if !service?
      EM.stop
    end
  end
end
start_service_subscriber() click to toggle source
# File lib/remote_service/queue.rb, line 42
def start_service_subscriber
  RemoteService.logger.debug "SERVICE QUEUE: #{service_queue_name}"
  @worker_pool.start
  @conn.subscribe(service_queue_name) do |*args|
    @worker_pool.run(*args) do |request, reply_to|
      begin
        payload = decode(request)
        RemoteService.logger.debug "FETCHED - REPLY_TO:[#{reply_to}] PAYLOAD:[#{payload}]"
        @service_handler.handle(payload, reply_to)
      rescue => e
        RemoteService.logger.error(e)
        Queue.instance.publish(
          reply_to,
          {result: nil, error: {name: e.class.name, message: e.message, backtrace: e.backtrace}},
        )
      end
    end
  end
end