class Fluent::TelemetryIosxeInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 32 def configure(conf) super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 36 def start super @sigint = false trap :INT do log.info "got SIGINT ..." @sigint = true end @hello_done = false @subscription_index = 0 @subscription_ids = [] @buffer = "" @parser = Nori.new(:parser => @parser, :advanced_typecasting => false) hello = <<-EOS <hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <capabilities> <capability>urn:ietf:params:netconf:base:1.0</capability> <capability>urn:ietf:params:netconf:base:1.1</capability> </capabilities> </hello>]]>]]> EOS @ssh = Net::SSH.start(@server, @user, :port => @port, :password => @password, :timeout => 10) @channel = @ssh.open_channel do |channel| channel.subsystem("netconf") do |ch, success| raise "subsystem could not be started" unless success ch.on_data do |c, data| log.debug "on data ..." log.debug data receive_data(data) end ch.on_close do |c| log.debug "on close ..." end ch.on_eof do |c| log.debug "on eof ..." end log.info "send hello" ch.send_data(hello) end end @ssh.loop(1) { not @sigint } # if we get sigint, we need to end loop end
Protected Instance Methods
chunk_frame(msg)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 202 def chunk_frame(msg) "\n##{msg.length}\n" << msg end
handle_hello(data)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 146 def handle_hello(data) log.info "got hello session_id=#{data["hello"]["session_id"]}" log.debug data @session_id = data["hello"]["session_id"] end
handle_notification(data)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 206 def handle_notification(data) log.info "got notification" log.debug data iso8601_time = data["notification"]["eventTime"] unix_time = Time.iso8601(iso8601_time).to_i content = data["notification"]["push_update"]["datastore_contents_xml"] traverse(content) do |node, key, parent| if @typecast_integer && node.is_i? parent[key] = node.to_i unless parent.nil? elsif @typecast_float && node.is_f? parent[key] = node.to_f unless parent.nil? end end router.emit(@tag, unix_time, content) end
handle_subscription_reply(data)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 222 def handle_subscription_reply(data) log.info "got subscription reply subscription_id=#{data['rpc_reply']['subscription_id']}" log.debug data @subscription_ids << data["rpc_reply"]["subscription_id"] end
parse()
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 120 def parse log.debug "parse!" log.debug @buffer d = @parser.parse(@buffer) if d['hello'] handle_hello(d) @hello_done = true subscribe() elsif d['notification'] handle_notification(d) elsif d["rpc_reply"] if d["rpc_reply"]["subscription_id"] handle_subscription_reply(d) if @subscription_index < @xpath_filters.size subscribe() end else log.fatal d["rpc_reply"]["subscription_result"] end else log.warn "got other messages" log.debug d end @buffer = '' end
receive_data(data)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 81 def receive_data(data) log.debug "receive data ..." if @hello_done # Chunked Framing if data.include?('##') if data == '##' parse() else data.each_line do |line| if not line =~ /^(#\d+|##)/ @buffer << line end end parse() end else data.each_line do |line| if not line =~ /^#\d+/ @buffer << line end end end else # End-of-Message Framing if data.include?(']]>]]>') if data == ']]>]]>' parse() else data.each_line do |line| if line != ']]>]]>' @buffer << line end end parse() end else @buffer << data end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 185 def shutdown log.info "shutdown ..." for id in @subscription_ids unsubscribe(id) end close_session = <<-EOS <?xml version="1.0" encoding="utf-8"?> <rpc message-id="101" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <close-session/> </rpc> EOS @channel.send_data(chunk_frame(close_session.chomp)) @channel.send_data("\n##\n") @ssh.close super end
subscribe()
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 152 def subscribe subscription = <<-EOS <?xml version="1.0" encoding="utf-8"?> <rpc message-id="#{@subscription_index+1}" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <establish-subscription xmlns="urn:ietf:params:xml:ns:yang:ietf-event-notifications" xmlns:yp="urn:ietf:params:xml:ns:yang:ietf-yang-push"> <stream>yp:yang-push</stream> <yp:xpath-filter>#{@xpath_filters[@subscription_index]}</yp:xpath-filter> <yp:period>#{@period}</yp:period> </establish-subscription> </rpc> EOS log.info "subscribe [#{@subscription_index+1}] #{@xpath_filters[@subscription_index]}" log.debug chunk_frame(subscription.chomp) @channel.send_data(chunk_frame(subscription.chomp)) @channel.send_data("\n##\n") @subscription_index += 1 end
traverse(obj, key=nil, parent=nil, &blk)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 228 def traverse(obj, key=nil, parent=nil, &blk) case obj when Hash obj.reject! { |k,v| k == "@xmlns" } if @strip_namespaces obj.each { |k,v| traverse(v, k, obj, &blk) } when Array obj.each_with_index { |v,k| traverse(v, k, obj, &blk) } else blk.call(obj, key, parent) end end
unsubscribe(id)
click to toggle source
# File lib/fluent/plugin/in_telemetry_iosxe.rb, line 170 def unsubscribe(id) delete_subscription = <<-EOS <?xml version="1.0" encoding="utf-8"?> <rpc message-id="101" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <delete-subscription xmlns="urn:ietf:params:xml:ns:yang:ietf-event-notifications" xmlns:netconf="urn:ietf:params:xml:ns:netconf:base:1.0"> <subscription-id>#{id}</subscription-id> </delete-subscription> </rpc> EOS log.info "unubscribe subscription_id=#{id}" log.debug chunk_frame(delete_subscription.chomp) @channel.send_data(chunk_frame(delete_subscription.chomp)) @channel.send_data("\n##\n") end