class Fluent::TelemetryInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 15
def configure(conf)
  super
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 28
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 19
def start
  super
  @hdr_parsed = false
  @buffer = ""
  @hdr_buffer = ""
  server_create(:in_telemetry_server, @port, bind: @bind, proto: :tcp) do |data, sock|
    receive_data(sock.remote_host, data)
  end
end

Protected Instance Methods

data_parse() click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 70
def data_parse()
  log.info "parse telemetry data ..."
  obj = JSON.parse(@buffer)
  tag = obj['encoding_path']
  es = MultiEventStream.new
  for data in obj['data_json']
    time = data['timestamp'] / 1000
    obj = data['keys'].merge(data['content'])
    record = nil
    if @delete_nested
      record = obj.reject { |k,v| v.is_a? Array }
    else
      record = obj
    end
    es.add(time, record)
  end
  router.emit_stream(tag, es)
end
hdr_parse() click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 60
def hdr_parse()
  log.info "parse telemetry header ..."
  msg_type = @hdr_buffer[0..1].unpack("n")[0]
  msg_encap = @hdr_buffer[2..3].unpack("n")[0]
  msg_hdr_ver = @hdr_buffer[4..5].unpack("n")[0]
  msg_flag = @hdr_buffer[6..7].unpack("n")[0]
  msg_len = @hdr_buffer[8..11].unpack("N")[0]
  log.info "msg_type=#{msg_type} msg_encap=#{msg_encap} msg_hdr_ver=#{msg_hdr_ver} msg_flag=#{msg_flag} msg_len=#{msg_len}"
  return msg_len
end
receive_data(host, data) click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxr.rb, line 34
def receive_data(host, data)
  log.info "receive data ..."
  if @hdr_parsed == false
    # Do I need to consider the case the header is in separate receive data?
    @hdr_buffer << data[0..11]
    @remaining_len = hdr_parse()
    @hdr_buffer = ""
    data = data[12..-1] # remove header
    @hdr_parsed = true
  end
  if data.length <= @remaining_len
    @buffer << data
    @remaining_len -= data.length
    if @remaining_len == 0
      data_parse()
      @buffer = ""  # reset
      @hdr_parsed = false
    end
  else # this happens when received data in fluentd tcp server contains multi telemetry messages.
    @buffer << data[0..@remaining_len-1]
    data_parse()
    @buffer = ""  # reset
    @hdr_parsed = false
    receive_data(host, data[@remaining_len..-1])
  end
end