class Newque::Pubsub_client

Public Class Methods

new(host, port, protocol_options:{}) click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 8
def initialize host, port, protocol_options:{}, socket_wait:100
  @ctx = ZMQ::Context.new
  @addr = "tcp://#{host}:#{port}"
  @options = Util.compute_options Zmq_tools::BASE_OPTIONS, protocol_options
  @socket_wait = socket_wait
  @disconnect = false
  @ready

  @listeners = {}
  @error_handlers = []
end

Public Instance Methods

add_error_handler(&block) click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 20
def add_error_handler &block
  @error_handlers << block
end
disconnect() click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 41
def disconnect
  @disconnect = true
  nil
end
subscribe(&block) click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 24
def subscribe &block
  @disconnect = false
  id = SecureRandom.uuid
  @listeners[id] = block
  start_loop unless is_looping?
  thread = Thread.new do
    @ready.join(1)
    id
  end
  Future.new thread, 10
end
unsubscribe(id) click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 36
def unsubscribe id
  @listeners.delete id
  nil
end

Private Instance Methods

is_looping?() click to toggle source
# File lib/newque/clients/pubsub_client.rb, line 91
def is_looping?
  !@thread.nil? && @thread.alive?
end
print_uncaught_exception(err, block_name) click to toggle source
start_loop() click to toggle source

The socket connection happens here so that no network traffic occurs while not subscribed

# File lib/newque/clients/pubsub_client.rb, line 49
def start_loop
  @disconnect = false
  @sock = @ctx.socket ZMQ::SUB
  Zmq_tools.set_zmq_sock_options @sock, @options

  @poller = ZMQ::Poller.new
  @poller.register_readable @sock

  @sock.connect @addr
  @sock.setsockopt(ZMQ::SUBSCRIBE, '')

  @ready = Util.wait_t
  @thread = Thread.new do
    @poller.poll(@socket_wait)
    Util.resolve_t @ready, ''
    loop do
      break if @disconnect
      next if @poller.poll(@socket_wait) == 0
      buffers = []
      @sock.recv_strings buffers, ZMQ::DONTWAIT
      input = Zmq_tools.parse_input buffers
      @listeners.values.each do |listener|
        begin
          listener.(input)
        rescue => listener_error
          print_uncaught_exception(listener_error, 'subscribe') if @error_handlers.size == 0
          @error_handlers.each do |err_handler|
            begin
              err_handler.(listener_error)
            rescue => uncaught_err
              print_uncaught_exception uncaught_err, 'add_error_handler'
            end
          end
        end
      end
    end
    @sock.disconnect @addr
    @sock.close
  end
  @thread.abort_on_exception = true
end