class HPFeeds::Client
Public Class Methods
new(options)
click to toggle source
# File lib/hpfeeds/client.rb, line 14 def initialize(options) @host = options.fetch(:host) @port = options.fetch(:port, 10000) @ident = options.fetch(:ident) @secret = options.fetch(:secret) @timeout = options.fetch(:timeout, 30) @reconnect = options.fetch(:reconnect, true) @sleepwait = options.fetch(:sleepwait, 20) @connected = false @stopped = false log_to = options.fetch(:log_to, STDOUT) log_level = options.fetch(:log_level, :info) @logger = Logger.new(log_to) @logger.level = get_log_level(log_level) @decoder = Decoder.new @handlers = {} @subscribed = [] tryconnect end
Public Instance Methods
close()
click to toggle source
# File lib/hpfeeds/client.rb, line 109 def close begin @logger.debug("Closing socket") @socket.close rescue => e @logger.warn("Socket exception when closing: #{e}") end end
connect()
click to toggle source
# File lib/hpfeeds/client.rb, line 55 def connect @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM) begin @logger.debug("connecting #{@host}:#{@port}") sockaddr = Socket.pack_sockaddr_in( @port, @host ) @socket.connect(sockaddr) rescue => e raise Exception.new("Could not connect to broker: #{e}.") end @logger.debug("waiting for data") header = receive_data(HEADERSIZE, @timeout) opcode, len = @decoder.parse_header(header) @logger.debug("received header, opcode = #{opcode}, len = #{len}") if opcode == OP_INFO data = receive_data(len-HEADERSIZE, @timeout) @logger.debug("received data = #{binary_to_hex(data)}") name, rand = @decoder.parse_info(data) @logger.debug("received INFO, name = #{name}, rand = #{binary_to_hex(rand)}") @brokername = name auth = @decoder.msg_auth(rand, @ident, @secret) @socket.send(auth, 0) else raise Exception.new('Expected info message at this point.') end @logger.info("connected to #{@host}, port #{@port}") @connected = true # set keepalive @socket.setsockopt(Socket::Option.bool(:INET, :SOCKET, :KEEPALIVE, true)) end
publish(data, *channels)
click to toggle source
# File lib/hpfeeds/client.rb, line 99 def publish(data, *channels) for c in channels publish_to_channel c, data end end
run(error_callback = nil)
click to toggle source
# File lib/hpfeeds/client.rb, line 118 def run(error_callback = nil) begin while !@stopped while @connected header = receive_data(HEADERSIZE, @timeout) if header.empty? @connected = false break end opcode, len = @decoder.parse_header(header) @logger.debug("received header, opcode = #{opcode}, len = #{len}") data = receive_data(len - HEADERSIZE, @timeout) if opcode == OP_ERROR unless error_callback.nil? error_callback.call(data) else raise ErrorMessage.new(data) end elsif opcode == OP_PUBLISH name, chan, payload = @decoder.parse_publish(data) @logger.debug("received #{payload.length} bytes of data from #{name} on channel #{chan}") handler = @handlers[chan] unless handler.nil? # ignore unhandled messages handler.call(name, chan, payload) end end end @logger.debug("Lost connection, trying to connect again...") tryconnect end rescue Timeout => e @logger.warn("#{e.class} caught while connecting: #{e}. Reconnecting in #{@sleepwait} seconds...") raise(e) unless @reconnect sleep(@sleepwait) tryconnect rescue ErrorMessage => e @logger.warn("#{e.class} caught in main loop: #{e}") raise e rescue => e message = "#{e.class} caught in main loop: #{e}\n" message += e.backtrace.join("\n") @logger.error(message) end end
stop()
click to toggle source
# File lib/hpfeeds/client.rb, line 105 def stop @stopped = true end
subscribe(*channels, &block)
click to toggle source
# File lib/hpfeeds/client.rb, line 86 def subscribe(*channels, &block) if block_given? handler = block else raise ArgumentError.new('When subscribing to a channel, you have to provide a block as a callback for message handling') end for c in channels subscribe_to_channel c @handlers[c] = handler unless handler.nil? @subscribed << c end end
tryconnect()
click to toggle source
# File lib/hpfeeds/client.rb, line 39 def tryconnect loop do begin connect() for c in @subscribed subscribe_to_channel c end break rescue => e @logger.warn("#{e.class} caught while connecting: #{e}. Reconnecting in #{@sleepwait} seconds...") raise(e) unless @reconnect sleep(@sleepwait) end end end
Private Instance Methods
binary_to_hex(s)
click to toggle source
# File lib/hpfeeds/client.rb, line 165 def binary_to_hex s "0x#{s.unpack('H*')[0]}" rescue '' end
get_log_level(level)
click to toggle source
# File lib/hpfeeds/client.rb, line 197 def get_log_level(level) begin Logger.const_get(level.to_s.upcase) rescue raise ArgumentError.new("Unknown log level #{level}") end end
publish_to_channel(c, data)
click to toggle source
# File lib/hpfeeds/client.rb, line 175 def publish_to_channel c, data @logger.info("publish to #{c}: #{data}") message = @decoder.msg_publish(@ident, c, data) @socket.send(message, 0) end
read_from_socket(max)
click to toggle source
# File lib/hpfeeds/client.rb, line 189 def read_from_socket(max) data = '' while (data.size < max) do data += @socket.recv(max - data.size) end data end
receive_data(max, timeout=nil)
click to toggle source
# File lib/hpfeeds/client.rb, line 181 def receive_data(max, timeout=nil) if IO.select([@socket], nil, nil, timeout) read_from_socket(max) else raise Timeout.new("Connection receive timeout.") end end
subscribe_to_channel(c)
click to toggle source
# File lib/hpfeeds/client.rb, line 169 def subscribe_to_channel c @logger.info("subscribing to #{c}") message = @decoder.msg_subscribe(@ident, c) @socket.send(message, 0) end