class Aggro::ZeroMQTransport::Server

Public: Server to handle messages from nanomsg clients.

Constants

DEFAULT_WORKER_COUNT

Public Class Methods

new(endpoint, callable = nil, &block) click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 20
def initialize(endpoint, callable = nil, &block)
  fail ArgumentError unless callable || block_given?

  @callable = block_given? ? block : callable
  @endpoint = endpoint
  @inproc_endpoint = "inproc://aggro-server-#{SecureRandom.hex}"
  @reply_mutex = Mutex.new
  @work_queue = Queue.new
end

Public Instance Methods

start() click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 30
def start
  fail ServerAlreadyRunning if @running

  @running = true
  start_master
  DEFAULT_WORKER_COUNT.times { start_worker }

  self
end
stop() click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 40
def stop
  return self unless @running

  @running = false

  self
end

Private Instance Methods

enqueue_request(socket) click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 50
def enqueue_request(socket)
  id = ''
  delimiter = ''
  message = ''

  socket.recv_string id
  socket.recv_string delimiter
  socket.recv_string message

  @work_queue << Workload.new(id, message, socket, @reply_mutex)
end
respond_to_request(workload) click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 62
def respond_to_request(workload)
  return if workload.nil?

  response = '00'
  response = @callable.call(workload.message)
ensure
  workload.finish response unless workload.nil?
end
start_master() click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 71
def start_master
  Concurrent::SingleThreadExecutor.new.post do
    socket = ZeroMQTransport.context.socket(ZMQ::XREP)
    poller = ZeroMQ::Poller.new
    poller.register_readable socket
    socket.setsockopt ZMQ::LINGER, ZeroMQTransport.linger
    socket.bind @endpoint

    (enqueue_request socket while poller.poll(1) > 0) while @running

    socket.close
  end
end
start_worker() click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 85
def start_worker
  Concurrent::SingleThreadExecutor.new.post do
    respond_to_request @work_queue.pop while @running
  end
end