class Newque::Newque_zmq

Attributes

sock[R]

Public Class Methods

new(host, port, options, timeout) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 9
def initialize host, port, options, timeout
  @ctx = ZMQ::Context.new
  @options = Util.compute_options Zmq_tools::BASE_OPTIONS, options
  @timeout = timeout / 1000.0

  @sock = @ctx.socket ZMQ::DEALER
  Zmq_tools.set_zmq_sock_options @sock, @options

  @poller = ZMQ::Poller.new
  @poller.register_readable @sock

  @router = {}
  @sock.connect "tcp://#{host}:#{port}"
  start_loop
  @thread
end

Public Instance Methods

count(channel) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 46
def count channel
  input = Input.new(channel: channel, count_input: Count_Input.new)
  send_request input do |buffers|
    output = parse_response buffers[0], :count_output
    Count_response.new output.count
  end
end
delete(channel) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 54
def delete channel
  input = Input.new(channel: channel, delete_input: Delete_Input.new)
  send_request input do |buffers|
    output = parse_response buffers[0], :delete_output
    Delete_response.new
  end
end
health(channel, global=false) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 62
def health channel, global=false
  input = Input.new(channel: channel, health_input: Health_Input.new(global: global))
  send_request input do |buffers|
    output = parse_response buffers[0], :health_output
    Health_response.new
  end
end
read(channel, mode, limit=nil) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 34
def read channel, mode, limit=nil
  input = Input.new(channel: channel, read_input: Read_Input.new(mode: mode, limit: limit))
  send_request input do |buf, *messages|
    output = parse_response buf, :read_output
    Read_response.new output.length, output.last_id, output.last_timens, messages
  end
end
read_stream(channel, mode, limit=nil) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 42
def read_stream channel, mode, limit=nil
  raise NewqueError.new "Read_stream is only available in :http mode"
end
write(channel, atomic, msgs, ids=nil) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 26
def write channel, atomic, msgs, ids=nil
  input = Input.new(channel: channel, write_input: Write_Input.new(atomic: atomic, ids: ids))
  send_request input, msgs do |buffers|
    output = parse_response buffers[0], :write_output
    Write_response.new output.saved
  end
end

Private Instance Methods

parse_response(buffer, type) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 111
def parse_response buffer, type
  output = Output.decode buffer
  if output.errors
    raise Util.newque_error output.errors
  end
  output.send type
end
register(id) { |current.thread_variable_get(:result)| ... } click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 101
def register id
  thread = Thread.new do
    Thread.stop
    @router.delete id
    yield Thread.current.thread_variable_get(:result)
  end
  Thread.pass # Give a hint to schedule the new thread now
  @router[id] = thread
end
send_request(input, msgs=[], async:false, &block) click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 72
def send_request input, msgs=[], async:false, &block
  id = SecureRandom.uuid
  meta = input.encode.to_s
  thread = register id, &block
  @sock.send_strings (msgs.size > 0 ? [id, meta] + msgs : [id, meta]), ZMQ::DONTWAIT
  Future.new thread, @timeout
end
start_loop() click to toggle source
# File lib/newque/zmq/newque_zmq.rb, line 80
def start_loop
  @thread = Thread.new do
    while @poller.poll(:blocking) > 0
      buffers = []
      @sock.recv_strings buffers, ZMQ::DONTWAIT
      id, *frames = buffers

      thread = @router[id]
      while thread.status == 'run'
        # If the scheduler didn't run the other thread and execute its Thread.stop
        # then we have to wait before we can continue. Sleep 0 yields to the scheduler.
        sleep 0
      end

      thread.thread_variable_set :result, frames
      thread.run if thread.status == 'sleep' # 'if' not necessary, it's a sanity check
    end
  end
  @thread.abort_on_exception = true
end