class Protobuf::Rpc::Zmq::Worker

Public Class Methods

new(server, broker) click to toggle source

Constructor

# File lib/protobuf/rpc/servers/zmq/worker.rb, line 15
def initialize(server, broker)
  @server = server
  @broker = broker

  init_zmq_context
  init_backend_socket
rescue
  teardown
  raise
end

Public Instance Methods

process_request() click to toggle source

Instance Methods

# File lib/protobuf/rpc/servers/zmq/worker.rb, line 29
def process_request
  client_address, _, data = read_from_backend
  return unless data

  gc_pause do
    encoded_response = handle_request(data)
    write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response])
  end
end
run() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 39
def run
  poller = ::ZMQ::Poller.new
  poller.register_readable(@backend_socket)
  poller.register_readable(@shutdown_socket)

  # Send request to broker telling it we are ready
  write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE])

  loop do
    rc = poller.poll(500)

    if rc == 0 && !running? # rubocop:disable Style/GuardClause
      break # The server was shutdown and no requests are pending
    elsif rc == -1
      break # Something went wrong
    elsif rc > 0
      ::Thread.current[:busy] = true
      process_request
      ::Thread.current[:busy] = false
    end
  end
ensure
  teardown
end
running?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 64
def running?
  @broker.running? && @server.running?
end

Private Instance Methods

init_backend_socket() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 79
def init_backend_socket
  @backend_socket = @zmq_context.socket(ZMQ::REQ)
  zmq_error_check(@backend_socket.connect(@server.backend_uri))
end
init_zmq_context() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 70
def init_zmq_context
  @zmq_context =
    if inproc?
      @server.zmq_context
    else
      ZMQ::Context.new
    end
end
inproc?() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 84
def inproc?
  !!@server.try(:inproc?)
end
read_from_backend() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 88
def read_from_backend
  frames = []
  zmq_error_check(@backend_socket.recv_strings(frames))
  frames
end
teardown() click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 94
def teardown
  @backend_socket.try(:close)
  @zmq_context.try(:terminate) unless inproc?
end
write_to_backend(frames) click to toggle source
# File lib/protobuf/rpc/servers/zmq/worker.rb, line 99
def write_to_backend(frames)
  zmq_error_check(@backend_socket.send_strings(frames))
end