class Newque::Newque_zmq
Attributes
sock[R]
Public Class Methods
new(host, port, options, timeout)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 9 def initialize host, port, options, timeout @ctx = ZMQ::Context.new @options = Util.compute_options Zmq_tools::BASE_OPTIONS, options @timeout = timeout / 1000.0 @sock = @ctx.socket ZMQ::DEALER Zmq_tools.set_zmq_sock_options @sock, @options @poller = ZMQ::Poller.new @poller.register_readable @sock @router = {} @sock.connect "tcp://#{host}:#{port}" start_loop @thread end
Public Instance Methods
count(channel)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 46 def count channel input = Input.new(channel: channel, count_input: Count_Input.new) send_request input do |buffers| output = parse_response buffers[0], :count_output Count_response.new output.count end end
delete(channel)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 54 def delete channel input = Input.new(channel: channel, delete_input: Delete_Input.new) send_request input do |buffers| output = parse_response buffers[0], :delete_output Delete_response.new end end
health(channel, global=false)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 62 def health channel, global=false input = Input.new(channel: channel, health_input: Health_Input.new(global: global)) send_request input do |buffers| output = parse_response buffers[0], :health_output Health_response.new end end
read(channel, mode, limit=nil)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 34 def read channel, mode, limit=nil input = Input.new(channel: channel, read_input: Read_Input.new(mode: mode, limit: limit)) send_request input do |buf, *messages| output = parse_response buf, :read_output Read_response.new output.length, output.last_id, output.last_timens, messages end end
read_stream(channel, mode, limit=nil)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 42 def read_stream channel, mode, limit=nil raise NewqueError.new "Read_stream is only available in :http mode" end
write(channel, atomic, msgs, ids=nil)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 26 def write channel, atomic, msgs, ids=nil input = Input.new(channel: channel, write_input: Write_Input.new(atomic: atomic, ids: ids)) send_request input, msgs do |buffers| output = parse_response buffers[0], :write_output Write_response.new output.saved end end
Private Instance Methods
parse_response(buffer, type)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 111 def parse_response buffer, type output = Output.decode buffer if output.errors raise Util.newque_error output.errors end output.send type end
register(id) { |current.thread_variable_get(:result)| ... }
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 101 def register id thread = Thread.new do Thread.stop @router.delete id yield Thread.current.thread_variable_get(:result) end Thread.pass # Give a hint to schedule the new thread now @router[id] = thread end
send_request(input, msgs=[], async:false, &block)
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 72 def send_request input, msgs=[], async:false, &block id = SecureRandom.uuid meta = input.encode.to_s thread = register id, &block @sock.send_strings (msgs.size > 0 ? [id, meta] + msgs : [id, meta]), ZMQ::DONTWAIT Future.new thread, @timeout end
start_loop()
click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 80 def start_loop @thread = Thread.new do while @poller.poll(:blocking) > 0 buffers = [] @sock.recv_strings buffers, ZMQ::DONTWAIT id, *frames = buffers thread = @router[id] while thread.status == 'run' # If the scheduler didn't run the other thread and execute its Thread.stop # then we have to wait before we can continue. Sleep 0 yields to the scheduler. sleep 0 end thread.thread_variable_set :result, frames thread.run if thread.status == 'sleep' # 'if' not necessary, it's a sanity check end end @thread.abort_on_exception = true end