class Newque::Fifo_client
Constants
- STATES
Public Class Methods
new(host, port, protocol_options:{})
click to toggle source
# File lib/newque/clients/fifo_client.rb, line 9 def initialize host, port, protocol_options:{}, socket_wait:100 @ctx = ZMQ::Context.new @addr = "tcp://#{host}:#{port}" @options = Util.compute_options Zmq_tools::BASE_OPTIONS, protocol_options @socket_wait = socket_wait @state = 0 end
Public Instance Methods
connect(&block)
click to toggle source
# File lib/newque/clients/fifo_client.rb, line 17 def connect &block raise NewqueError.new "Cannot connect because this client is #{current_state}" unless current_state == :NEW next_state :CONNECTING @handler = block @sock = @ctx.socket ZMQ::DEALER Zmq_tools.set_zmq_sock_options @sock, @options @poller = ZMQ::Poller.new @poller.register_readable @sock @sock.connect @addr ready = Util.wait_t @thread = Thread.new do next_state :RUNNING @poller.poll(@socket_wait) Util.resolve_t ready, '' loop do if current_state == :DISCONNECTING @sock.disconnect @addr @socket_wait = 0 next_state :CLOSING end if @poller.poll(@socket_wait) == 0 if current_state == :CLOSING @sock.close break else next end end # RECEIVING INCOMING MESSAGE buffers = [] @sock.recv_strings buffers, ZMQ::DONTWAIT id, *frames = buffers parsed = Zmq_tools.parse_input frames response = begin returned = @handler.(parsed) unless returned.respond_to?(:serialize) raise NewqueError.new "Block given to Fifo_client.connect returned #{returned.class} which is not a valid response object" end serialized = returned.serialize messages = serialized[:messages] serialized.delete :messages { output: Output.new(serialized.merge!(errors: [])), messages: messages } rescue => handler_error { output: Output.new(errors: [handler_error.to_s], error_output: Error_Output.new), messages: [] } end @sock.send_strings ([id, response[:output].encode.to_s] + response[:messages]), ZMQ::DONTWAIT end next_state :CLOSED end @thread.abort_on_exception = true Future.new ready, 10 end
disconnect()
click to toggle source
# File lib/newque/clients/fifo_client.rb, line 81 def disconnect state = current_state if state == :NEW nil elsif state == :RUNNING next_state :DISCONNECTING else raise NewqueError.new "Can't disconnect since the Fifo_client is currently #{state}" end nil end
Private Instance Methods
current_state()
click to toggle source
# File lib/newque/clients/fifo_client.rb, line 95 def current_state STATES[@state] end
next_state(should_be)
click to toggle source
# File lib/newque/clients/fifo_client.rb, line 99 def next_state should_be goes_to = STATES[@state + 1] unless goes_to == should_be raise NewqueError.new "Inconsistent state in Fifo_client. #{goes_to} should be #{should_be}" end @state = @state + 1 end