class Protobuf::Rpc::Zmq::Broker

Attributes

local_queue[R]

Public Class Methods

new(server) click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 11
def initialize(server)
  @server = server

  init_zmq_context
  init_local_queue
  init_backend_socket
  init_frontend_socket
  init_poller
rescue
  teardown
  raise
end

Public Instance Methods

run() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 24
def run
  @idle_workers = []
  @running = true

  loop do
    process_local_queue
    rc = @poller.poll(broker_polling_milliseconds)

    # The server was shutdown and no requests are pending
    break if rc == 0 && !running? && @server.workers.empty?
    # Something went wrong
    break if rc == -1

    check_and_process_backend
    process_local_queue # Fair ordering so queued requests get in before new requests
    check_and_process_frontend
  end
ensure
  teardown
  @running = false
end
running?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 46
def running?
  @running && @server.running?
end

Private Instance Methods

backend_poll_weight() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 52
def backend_poll_weight
  @backend_poll_weight ||= [ENV["PB_ZMQ_SERVER_BACKEND_POLL_WEIGHT"].to_i, 1].max
end
broker_polling_milliseconds() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 56
def broker_polling_milliseconds
  @broker_polling_milliseconds ||= [ENV["PB_ZMQ_BROKER_POLLING_MILLISECONDS"].to_i, 500].max
end
check_and_process_backend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 60
def check_and_process_backend
  readables_include_backend = @poller.readables.include?(@backend_socket)
  message_count_read_from_backend = 0

  while readables_include_backend && message_count_read_from_backend < backend_poll_weight
    message_count_read_from_backend += 1
    process_backend
    @poller.poll_nonblock
    readables_include_backend = @poller.readables.include?(@backend_socket)
  end
end
check_and_process_frontend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 72
def check_and_process_frontend
  readables_include_frontend = @poller.readables.include?(@frontend_socket)
  message_count_read_from_frontend = 0

  while readables_include_frontend && message_count_read_from_frontend < frontend_poll_weight
    message_count_read_from_frontend += 1
    process_frontend
    break unless local_queue_available? # no need to read frontend just to throw away messages, will prioritize backend when full
    @poller.poll_nonblock
    readables_include_frontend = @poller.readables.include?(@frontend_socket)
  end
end
frontend_poll_weight() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 85
def frontend_poll_weight
  @frontend_poll_weight ||= [ENV["PB_ZMQ_SERVER_FRONTEND_POLL_WEIGHT"].to_i, 1].max
end
init_backend_socket() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 89
def init_backend_socket
  @backend_socket = @zmq_context.socket(ZMQ::ROUTER)
  zmq_error_check(@backend_socket.bind(@server.backend_uri))
end
init_frontend_socket() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 94
def init_frontend_socket
  @frontend_socket = @zmq_context.socket(ZMQ::ROUTER)
  zmq_error_check(@frontend_socket.bind(@server.frontend_uri))
end
init_local_queue() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 99
def init_local_queue
  @local_queue = []
end
init_poller() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 103
def init_poller
  @poller = ZMQ::Poller.new
  @poller.register_readable(@frontend_socket)
  @poller.register_readable(@backend_socket)
end
init_zmq_context() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 109
def init_zmq_context
  @zmq_context =
    if inproc?
      @server.zmq_context
    else
      ZMQ::Context.new
    end
end
inproc?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 118
def inproc?
  !!@server.try(:inproc?)
end
local_queue_available?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 122
def local_queue_available?
  local_queue.size < local_queue_max_size && running?
end
local_queue_max_size() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 126
def local_queue_max_size
  @local_queue_max_size ||= [ENV["PB_ZMQ_SERVER_QUEUE_MAX_SIZE"].to_i, 5].max
end
process_backend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 130
def process_backend
  worker, _ignore, *frames = read_from_backend

  @idle_workers << worker

  unless frames == [::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE]
    write_to_frontend(frames)
  end
end
process_frontend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 140
def process_frontend
  address, _, message, *frames = read_from_frontend

  if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
    if local_queue_available?
      write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
    else
      write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])
    end
  else
    if @idle_workers.empty? # rubocop:disable Style/IfInsideElse
      local_queue << [address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message].concat(frames)
    else
      write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, message]).concat(frames))
    end
  end
end
process_local_queue() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 158
def process_local_queue
  return if local_queue.empty?
  return if @idle_workers.empty?

  write_to_backend([@idle_workers.shift, ::Protobuf::Rpc::Zmq::EMPTY_STRING].concat(local_queue.shift))
  process_local_queue
end
read_from_backend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 166
def read_from_backend
  frames = []
  zmq_error_check(@backend_socket.recv_strings(frames))
  frames
end
read_from_frontend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 172
def read_from_frontend
  frames = []
  zmq_error_check(@frontend_socket.recv_strings(frames))
  frames
end
teardown() click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 178
def teardown
  @frontend_socket.try(:close)
  @backend_socket.try(:close)
  @zmq_context.try(:terminate) unless inproc?
end
write_to_backend(frames) click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 184
def write_to_backend(frames)
  zmq_error_check(@backend_socket.send_strings(frames))
end
write_to_frontend(frames) click to toggle source
# File lib/protobuf/rpc/servers/zmq/broker.rb, line 188
def write_to_frontend(frames)
  zmq_error_check(@frontend_socket.send_strings(frames))
end