class HPFeeds::Client

Public Class Methods

new(options) click to toggle source
# File lib/hpfeeds/client.rb, line 14
def initialize(options)
  @host   = options.fetch(:host)
  @port   = options.fetch(:port, 10000)
  @ident  = options.fetch(:ident)
  @secret = options.fetch(:secret)

  @timeout   = options.fetch(:timeout, 30)
  @reconnect = options.fetch(:reconnect, true)
  @sleepwait = options.fetch(:sleepwait, 20)

  @connected = false
  @stopped   = false

  log_to    = options.fetch(:log_to, STDOUT)
  log_level = options.fetch(:log_level, :info)
  @logger   = Logger.new(log_to)
  @logger.level = get_log_level(log_level)

  @decoder = Decoder.new
  @handlers   = {}
  @subscribed = []

  tryconnect
end

Public Instance Methods

close() click to toggle source
# File lib/hpfeeds/client.rb, line 109
def close
  begin
    @logger.debug("Closing socket")
    @socket.close
  rescue => e
    @logger.warn("Socket exception when closing: #{e}")
  end
end
connect() click to toggle source
# File lib/hpfeeds/client.rb, line 55
def connect
  @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM)
  begin
    @logger.debug("connecting #{@host}:#{@port}")
    sockaddr = Socket.pack_sockaddr_in( @port, @host )
    @socket.connect(sockaddr)
  rescue => e
    raise Exception.new("Could not connect to broker: #{e}.")
  end
  @logger.debug("waiting for data")
  header = receive_data(HEADERSIZE, @timeout)
  opcode, len = @decoder.parse_header(header)
  @logger.debug("received header, opcode = #{opcode}, len = #{len}")

  if opcode == OP_INFO
    data = receive_data(len-HEADERSIZE, @timeout)
    @logger.debug("received data = #{binary_to_hex(data)}")
    name, rand = @decoder.parse_info(data)
    @logger.debug("received INFO, name = #{name}, rand = #{binary_to_hex(rand)}")
    @brokername = name
    auth = @decoder.msg_auth(rand, @ident, @secret)
    @socket.send(auth, 0)
  else
    raise Exception.new('Expected info message at this point.')
  end
  @logger.info("connected to #{@host}, port #{@port}")
  @connected = true
  # set keepalive
  @socket.setsockopt(Socket::Option.bool(:INET, :SOCKET, :KEEPALIVE, true))
end
publish(data, *channels) click to toggle source
# File lib/hpfeeds/client.rb, line 99
def publish(data, *channels)
  for c in channels
    publish_to_channel c, data
  end
end
run(error_callback = nil) click to toggle source
# File lib/hpfeeds/client.rb, line 118
def run(error_callback = nil)
  begin
    while !@stopped
      while @connected
        header = receive_data(HEADERSIZE, @timeout)
        if header.empty?
          @connected = false
          break
        end
        opcode, len = @decoder.parse_header(header)
        @logger.debug("received header, opcode = #{opcode}, len = #{len}")
        data = receive_data(len - HEADERSIZE, @timeout)
        if opcode == OP_ERROR
          unless error_callback.nil?
            error_callback.call(data)
          else
            raise ErrorMessage.new(data)
          end
        elsif opcode == OP_PUBLISH
          name, chan, payload = @decoder.parse_publish(data)
          @logger.debug("received #{payload.length} bytes of data from #{name} on channel #{chan}")
          handler = @handlers[chan]
          unless handler.nil?
            # ignore unhandled messages
            handler.call(name, chan, payload)
          end
        end
      end
      @logger.debug("Lost connection, trying to connect again...")
      tryconnect
    end
  rescue Timeout => e
    @logger.warn("#{e.class} caught while connecting: #{e}. Reconnecting in #{@sleepwait} seconds...")
    raise(e) unless @reconnect
    sleep(@sleepwait)
    tryconnect
  rescue ErrorMessage => e
    @logger.warn("#{e.class} caught in main loop: #{e}")
    raise e
  rescue => e
    message = "#{e.class} caught in main loop: #{e}\n"
    message += e.backtrace.join("\n")
    @logger.error(message)
  end
end
stop() click to toggle source
# File lib/hpfeeds/client.rb, line 105
def stop
  @stopped = true
end
subscribe(*channels, &block) click to toggle source
# File lib/hpfeeds/client.rb, line 86
def subscribe(*channels, &block)
  if block_given?
    handler = block
  else
    raise ArgumentError.new('When subscribing to a channel, you have to provide a block as a callback for message handling')
  end
  for c in channels
    subscribe_to_channel c
    @handlers[c] = handler unless handler.nil?
    @subscribed << c
  end
end
tryconnect() click to toggle source
# File lib/hpfeeds/client.rb, line 39
def tryconnect
  loop do
    begin
      connect()
      for c in @subscribed
        subscribe_to_channel c
      end
      break
    rescue => e
      @logger.warn("#{e.class} caught while connecting: #{e}. Reconnecting in #{@sleepwait} seconds...")
      raise(e) unless @reconnect
      sleep(@sleepwait)
    end
  end
end

Private Instance Methods

binary_to_hex(s) click to toggle source
# File lib/hpfeeds/client.rb, line 165
def binary_to_hex s
  "0x#{s.unpack('H*')[0]}" rescue ''
end
get_log_level(level) click to toggle source
# File lib/hpfeeds/client.rb, line 197
def get_log_level(level)
  begin
    Logger.const_get(level.to_s.upcase)
  rescue
    raise ArgumentError.new("Unknown log level #{level}")
  end
end
publish_to_channel(c, data) click to toggle source
# File lib/hpfeeds/client.rb, line 175
def publish_to_channel c, data
  @logger.info("publish to #{c}: #{data}")
  message = @decoder.msg_publish(@ident, c, data)
  @socket.send(message, 0)
end
read_from_socket(max) click to toggle source
# File lib/hpfeeds/client.rb, line 189
def read_from_socket(max)
  data = ''
  while (data.size < max) do
    data += @socket.recv(max - data.size)
  end
  data
end
receive_data(max, timeout=nil) click to toggle source
# File lib/hpfeeds/client.rb, line 181
def receive_data(max, timeout=nil)
  if IO.select([@socket], nil, nil, timeout)
    read_from_socket(max)
  else
    raise Timeout.new("Connection receive timeout.")
  end
end
subscribe_to_channel(c) click to toggle source
# File lib/hpfeeds/client.rb, line 169
def subscribe_to_channel c
  @logger.info("subscribing to #{c}")
  message = @decoder.msg_subscribe(@ident, c)
  @socket.send(message, 0)
end