class Fyrehose::InputStream

Attributes

buf[RW]
pos[RW]
state[RW]

Public Class Methods

new() click to toggle source
# File lib/fyrehose/input_stream.rb, line 6
def initialize
  @state = -7
  @pos = 0
  @buf = ""
end

Public Instance Methods

<<(chunk) click to toggle source
# File lib/fyrehose/input_stream.rb, line 12
def <<(chunk)
  buf << chunk
end
each() { |{ :type => type, :txid => txid, :channel => channel, :flags => len_or_flags, :body => body }| ... } click to toggle source
# File lib/fyrehose/input_stream.rb, line 16
def each
  trim = 0

  while pos < buf.size do
    case state

      when -7
        self.state += 1
        type = nil
        txid = ""
        channel = ""
        len_or_flags = ""
        body = ""
        next

      when -6
        raise ProtocolError if buf[pos] != "#"
        self.state += 1

      when -3, -5
        if buf[pos] == " "
          self.state += 1
        else
          txid    << buf[pos] if state == -5
          channel << buf[pos] if state == -3
        end

      when -4
        if buf[pos] == "$"
          type = :ack
          self.state = -1
        elsif buf[pos] == "@"
          self.state += 1
        else
          raise ProtocolError
        end

      when -2
        self.state += 1
        if buf[pos] == "*"
          type = :data
        elsif buf[pos] == "+"
          type = :flags
        else
          raise ProtocolError
        end

      when -1
        if buf[pos] == " " || buf[pos] == "\n"
          len_or_flags = len_or_flags.to_i
          if type == :data
            self.state = len_or_flags
          else
            self.state += 1
            next
          end
        else
          len_or_flags << buf[pos]
        end

      when 0
        yield({
          :type => type,
          :txid => txid,
          :channel => channel,
          :flags => len_or_flags,
          :body => body
        })
        trim = pos + 1
        self.state = -7

      else
        self.state -= 1
        body << buf[pos]

    end

    self.pos += 1
  end

  if trim > 0
    self.pos -= trim
    self.buf = self.buf[trim..-1]
  end

rescue Fyrehose::ProtocolError => e
  raise Fyrehose::ProtocolError.new(self.buf, self.pos)
end