class Nsqrb::Consumer
Constants
- PROTOCOL_VERSION
- TCP_BUFFER
Attributes
errors[R]
messages[R]
options[R]
responses[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/nsqrb/consumer.rb, line 10 def initialize(options = {}) @options = options @messages = [] @errors = [] @responses = [] @parser = Parser.new end
Public Instance Methods
close!()
click to toggle source
# File lib/nsqrb/consumer.rb, line 45 def close! return if @socket.nil? || @socket.closed? @socket.write Command::Cls.new.to_line @socket.close @socket = nil end
confirm(message)
click to toggle source
# File lib/nsqrb/consumer.rb, line 30 def confirm(message) @socket.write Command::Fin.new(message_id: message.id).to_line update_rdy end
connect!()
click to toggle source
# File lib/nsqrb/consumer.rb, line 52 def connect! close! if @socket && !@socket.closed? @socket = TCPSocket.open(options[:host], options[:port]) @socket.write PROTOCOL_VERSION.rjust(4).upcase @socket.write Command::Identify.new(identify_defaults.merge(options[:features] || {})).to_line @socket.write Command::Sub.new(topic_name: options[:topic], channel_name: options[:channel]).to_line @socket.write Command::Rdy.new(count: 1).to_line puts 'Ready to receive!' end
receive()
click to toggle source
# File lib/nsqrb/consumer.rb, line 18 def receive begin buffer = @socket.recv(TCP_BUFFER) @parser.add(buffer) frames = @parser.parse frames.each { |frame| handle(frame) } rescue => e close! raise e end end
requeue(message, timeout = 0)
click to toggle source
# File lib/nsqrb/consumer.rb, line 35 def requeue(message, timeout = 0) @socket.write Command::Req.new(message_id: message.id, timeout: timeout).to_line update_rdy end
touch(message)
click to toggle source
# File lib/nsqrb/consumer.rb, line 40 def touch(message) @socket.write Command::Touch.new(message_id: message.id).to_line update_rdy end
Private Instance Methods
handle(frame)
click to toggle source
# File lib/nsqrb/consumer.rb, line 79 def handle(frame) if(frame.is_a?(Frame::Response) && frame.content == '_heartbeat_') @socket.write Command::Nop.new.to_line elsif frame.is_a?(Frame::Message) @messages << frame elsif frame.is_a?(Frame::Response) @responses << frame elsif frame.is_a?(Frame::Error) @errors << frame end end
identify_defaults()
click to toggle source
# File lib/nsqrb/consumer.rb, line 69 def identify_defaults return @identify_defaults if @identify_defaults @identify_defaults = { short_id: Socket.gethostname, long_id: Socket.gethostbyname(Socket.gethostname).flatten.compact.first, user_agent: "nsqrb/#{Nsqrb::VERSION}", feature_negotiation: true } end
update_rdy()
click to toggle source
# File lib/nsqrb/consumer.rb, line 65 def update_rdy @socket.write Command::Rdy.new(count: 1).to_line end