class Nsqrb::Consumer

Constants

PROTOCOL_VERSION
TCP_BUFFER

Attributes

errors[R]
messages[R]
options[R]
responses[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/nsqrb/consumer.rb, line 10
def initialize(options = {})
  @options = options
  @messages = []
  @errors = []
  @responses = []
  @parser = Parser.new
end

Public Instance Methods

close!() click to toggle source
# File lib/nsqrb/consumer.rb, line 45
def close!
  return if @socket.nil? || @socket.closed?
  @socket.write Command::Cls.new.to_line
  @socket.close
  @socket = nil
end
confirm(message) click to toggle source
# File lib/nsqrb/consumer.rb, line 30
def confirm(message)
  @socket.write Command::Fin.new(message_id: message.id).to_line
  update_rdy
end
connect!() click to toggle source
# File lib/nsqrb/consumer.rb, line 52
def connect!
  close! if @socket && !@socket.closed?
  @socket = TCPSocket.open(options[:host], options[:port])
  @socket.write PROTOCOL_VERSION.rjust(4).upcase
  @socket.write Command::Identify.new(identify_defaults.merge(options[:features] || {})).to_line
  @socket.write Command::Sub.new(topic_name: options[:topic], channel_name: options[:channel]).to_line
  @socket.write Command::Rdy.new(count: 1).to_line
  puts 'Ready to receive!'
end
receive() click to toggle source
# File lib/nsqrb/consumer.rb, line 18
def receive
  begin
    buffer = @socket.recv(TCP_BUFFER)
    @parser.add(buffer)
    frames = @parser.parse
    frames.each { |frame| handle(frame) }
  rescue => e
    close!
    raise e
  end
end
requeue(message, timeout = 0) click to toggle source
# File lib/nsqrb/consumer.rb, line 35
def requeue(message, timeout = 0)
  @socket.write Command::Req.new(message_id: message.id, timeout: timeout).to_line
  update_rdy
end
touch(message) click to toggle source
# File lib/nsqrb/consumer.rb, line 40
def touch(message)
  @socket.write Command::Touch.new(message_id: message.id).to_line
  update_rdy
end

Private Instance Methods

handle(frame) click to toggle source
# File lib/nsqrb/consumer.rb, line 79
def handle(frame)
  if(frame.is_a?(Frame::Response) && frame.content == '_heartbeat_')
    @socket.write Command::Nop.new.to_line
  elsif frame.is_a?(Frame::Message)
    @messages << frame
  elsif frame.is_a?(Frame::Response)
    @responses << frame
  elsif frame.is_a?(Frame::Error)
    @errors << frame
  end
end
identify_defaults() click to toggle source
# File lib/nsqrb/consumer.rb, line 69
def identify_defaults
  return @identify_defaults if @identify_defaults
  @identify_defaults = {
    short_id: Socket.gethostname,
    long_id: Socket.gethostbyname(Socket.gethostname).flatten.compact.first,
    user_agent: "nsqrb/#{Nsqrb::VERSION}",
    feature_negotiation: true
  }
end
update_rdy() click to toggle source
# File lib/nsqrb/consumer.rb, line 65
def update_rdy
  @socket.write Command::Rdy.new(count: 1).to_line
end