class Zeromem::Ws::WsHeavy

Public Class Methods

new(args = {}) click to toggle source

WS initializer @param [Hash] args the options to create the web socket node with @option args [String] :host the hostname/ip which to listen on @option args [Integer] :port the port which to listen on

Calls superclass method
# File lib/zeromem/ws.rb, line 12
def initialize(args = {})
  super(args)
end

Public Instance Methods

invoke(uri, rpc_method, *args) click to toggle source

Instructs node to send rpc request, and wait for / return response

Implementation of Zeromem::Ws::WsHeavy#invoke This is custom implementation of RJR::Node#invoke to make things work perfectly under heavy load (thousands of ws queries per min)

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

@param [String] uri location of node to send request to, should be

in format of ws://hostname:port

@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/zeromem/ws.rb, line 29
def invoke(uri, rpc_method, *args)
  message = RJR::Messages::Request.new :method => rpc_method,
  :args   => args,
  :headers => @message_headers

  @@em.schedule {
    init_client(uri) do |c|
      c.stream { |msg| handle_message(msg.data, c) }

      c.send_msg message.to_s
    end
  }

  # TODO optional timeout for response ?
  # this cause resource leak
  #result = wait_for_result(message)

  result = wait_for_result_custom(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end

Private Instance Methods

wait_for_result_custom(message) click to toggle source
# File lib/zeromem/ws.rb, line 55
def wait_for_result_custom(message)
  res = nil
  message_id = message.msg_id
  @pending[message_id] = Time.now
  while res.nil?
    @response_lock.synchronize{
      # Prune messages that timed out
      if @timeout
        now = Time.now
        @pending.delete_if { |_, start_time| (now - start_time) > @timeout }
      end
      pending_ids = @pending.keys
      fail 'Timed out' unless pending_ids.include? message_id

      # Prune invalid responses
      @responses.keep_if { |response| @pending.has_key? response.first }
      res = @responses.find { |response| message.msg_id == response.first }
      if !res.nil?
        @responses.delete(res)
        @pending.delete(message_id)
      else
        @response_cv.wait @response_lock, @wait_interval
      end
    }
  end
  return res
end