class Lumberjack::Beats::Parser

Constants

FRAME_COMPRESSED
FRAME_DATA
FRAME_JSON_DATA
FRAME_WINDOW
PROTOCOL_VERSION_1
PROTOCOL_VERSION_2
SUPPORTED_PROTOCOLS

Public Class Methods

new() click to toggle source
# File lib/lumberjack/beats/server.rb, line 144
def initialize
  @buffer_offset = 0
  @buffer = ""
  @buffer.force_encoding("BINARY")
  transition(:header, 2)
end

Public Instance Methods

compressed_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 294
def compressed_lead(&block)
  length = get.unpack("N").first
  transition(:compressed_payload, length)
end
compressed_payload(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 299
def compressed_payload(&block)
  original = Zlib::Inflate.inflate(get)
  transition(:header, 2)

  # Parse the uncompressed payload.
  parser = self.class.new
  parser.feed(original, &block)
end
data_field_key(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 269
def data_field_key(&block)
  @key = get
  transition(:data_field_value_len, 4)
end
data_field_key_len(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 264
def data_field_key_len(&block)
  key_len = get.unpack("N").first
  transition(:data_field_key, key_len)
end
data_field_value() { |:data, sequence, data| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 278
def data_field_value(&block)
  @value = get

  @data_count -= 1
  @data[@key] = @value

  if @data_count > 0
    transition(:data_field_key_len, 4)
  else
    # emit the whole map now that we found the end of the data fields list.
    yield :data, @sequence, @data
    transition(:header, 2)
  end

end
data_field_value_len(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 274
def data_field_value_len(&block)
  transition(:data_field_value, get.unpack("N").first)
end
data_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 258
def data_lead(&block)
  @sequence, @data_count = get.unpack("NN")
  @data = {}
  transition(:data_field_key_len, 4)
end
feed(data, &block) click to toggle source

Feed data to this parser.

Currently, it will return the raw payload of websocket messages. Otherwise, it returns nil if no complete message has yet been consumed.

@param [String] the string data to feed into the parser. @return [String, nil] the websocket message payload, if any, nil otherwise.

# File lib/lumberjack/beats/server.rb, line 167
def feed(data, &block)
  @buffer << data
  #p :need => @need
  while have?(@need)
    send(@state, &block)
    #case @state
    #when :header; header(&block)
    #when :window_size; window_size(&block)
    #when :data_lead; data_lead(&block)
    #when :data_field_key_len; data_field_key_len(&block)
    #when :data_field_key; data_field_key(&block)
    #when :data_field_value_len; data_field_value_len(&block)
    #when :data_field_value; data_field_value(&block)
    #when :data_field_value; data_field_value(&block)
    #when :compressed_lead; compressed_lead(&block)
    #when :compressed_payload; compressed_payload(&block)
    #end # case @state
  end
  return nil
end
get(length=nil) click to toggle source

Get 'length' string from the buffer.

# File lib/lumberjack/beats/server.rb, line 194
def get(length=nil)
  length = @need if length.nil?
  data = @buffer[@buffer_offset ... @buffer_offset + length]
  @buffer_offset += length
  if @buffer_offset > 16384
    @buffer = @buffer[@buffer_offset  .. -1]
    @buffer_offset = 0
  end
  return data
end
handle_version(version) { |:version, version| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 229
def handle_version(version, &block)
  if supported_protocol?(version)
    yield :version, version
  else
    raise "unsupported protocol #{version}"
  end
end
have?(length) click to toggle source

Do we have at least 'length' bytes in the buffer?

# File lib/lumberjack/beats/server.rb, line 189
def have?(length)
  return length <= (@buffer.size - @buffer_offset)
end
header(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 214
def header(&block)
  version, frame_type = get.bytes.to_a[0..1]
  version ||= PROTOCOL_VERSION_1

  handle_version(version, &block)

  case frame_type
  when FRAME_WINDOW; transition(:window_size, 4)
  when FRAME_DATA; transition(:data_lead, 8)
  when FRAME_JSON_DATA; transition(:json_data_lead, 8)
  when FRAME_COMPRESSED; transition(:compressed_lead, 4)
  else; raise "Unknown frame type: `#{frame_type}`"
  end
end
json_data_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 247
def json_data_lead(&block)
  @sequence, payload_size = get.unpack("NN")
  transition(:json_data_payload, payload_size)
end
json_data_payload() { |:json, sequence, load| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 252
def json_data_payload(&block)
  payload = get
  yield :json, @sequence, Lumberjack::Beats::json.load(payload)
  transition(:header, 2)
end
need(length) click to toggle source

Set the minimum number of bytes we need in the buffer for the next read.

# File lib/lumberjack/beats/server.rb, line 206
def need(length)
  @need = length
end
supported_protocol?(version) click to toggle source
# File lib/lumberjack/beats/server.rb, line 237
def supported_protocol?(version)
  SUPPORTED_PROTOCOLS.include?(version)
end
transition(state, next_length) click to toggle source
# File lib/lumberjack/beats/server.rb, line 151
def transition(state, next_length)
  @state = state
  #puts :transition => state
  # TODO(sissel): Assert this self.respond_to?(state)
  # TODO(sissel): Assert state is in STATES
  # TODO(sissel): Assert next_length is a number
  need(next_length)
end
window_size() { |:window_size, window_size| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 241
def window_size(&block)
  @window_size = get.unpack("N").first
  transition(:header, 2)
  yield :window_size, @window_size
end