class Aggro::ZeroMQTransport::Server
Public: Server
to handle messages from nanomsg clients.
Constants
- DEFAULT_WORKER_COUNT
Public Class Methods
new(endpoint, callable = nil, &block)
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 20 def initialize(endpoint, callable = nil, &block) fail ArgumentError unless callable || block_given? @callable = block_given? ? block : callable @endpoint = endpoint @inproc_endpoint = "inproc://aggro-server-#{SecureRandom.hex}" @reply_mutex = Mutex.new @work_queue = Queue.new end
Public Instance Methods
start()
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 30 def start fail ServerAlreadyRunning if @running @running = true start_master DEFAULT_WORKER_COUNT.times { start_worker } self end
stop()
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 40 def stop return self unless @running @running = false self end
Private Instance Methods
enqueue_request(socket)
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 50 def enqueue_request(socket) id = '' delimiter = '' message = '' socket.recv_string id socket.recv_string delimiter socket.recv_string message @work_queue << Workload.new(id, message, socket, @reply_mutex) end
respond_to_request(workload)
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 62 def respond_to_request(workload) return if workload.nil? response = '00' response = @callable.call(workload.message) ensure workload.finish response unless workload.nil? end
start_master()
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 71 def start_master Concurrent::SingleThreadExecutor.new.post do socket = ZeroMQTransport.context.socket(ZMQ::XREP) poller = ZeroMQ::Poller.new poller.register_readable socket socket.setsockopt ZMQ::LINGER, ZeroMQTransport.linger socket.bind @endpoint (enqueue_request socket while poller.poll(1) > 0) while @running socket.close end end
start_worker()
click to toggle source
# File lib/aggro/zeromq_transport/server.rb, line 85 def start_worker Concurrent::SingleThreadExecutor.new.post do respond_to_request @work_queue.pop while @running end end