class HrrRbNetconf::Server::Session

Attributes

session_id[R]

Public Class Methods

new(server, capabilities, datastore, session_id, io, strict_capabilities, enable_filter, logger: nil) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 21
def initialize server, capabilities, datastore, session_id, io, strict_capabilities, enable_filter, logger: nil
  self.logger = logger
  @server = server
  @local_capabilities = capabilities
  @remote_capabilities = Array.new
  @datastore = datastore
  @session_id = session_id
  @io_r, @io_w = case io
                 when IO
                   [io, io]
                 when Array
                   [io[0], io[1]]
                 else
                   raise ArgumentError, "io must be an instance of IO or Array"
                 end
  @strict_capabilities = strict_capabilities
  @enable_filter = enable_filter
  @closed = false
  @notification_enabled = false
  @monitor = Monitor.new
  @subscribed_streams = Hash.new
  @subscribed_streams_stop_time = Hash.new
  @notification_replay_thread = nil
  @subscription_termination_thread = nil
end

Public Instance Methods

close() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 60
def close
  log_info { "Being closed" }
  @closed = true
  @io_r.close_read rescue nil
  @notification_replay_thread.exit if @notification_replay_thread rescue nil
  @notification_termination_thread.exit if @notification_termination_thread rescue nil
  log_info { "Closed" }
end
close_other(session_id) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 173
def close_other session_id
  @server.close_session session_id
end
closed?() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 69
def closed?
  @closed
end
create_subscription(stream, start_time, stop_time) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 199
def create_subscription stream, start_time, stop_time
  log_info { "Create subscription for stream: #{stream}" }
  @subscribed_streams[stream] = true
  start_notification_termination_thread stream, start_time, stop_time if stop_time
  log_info { "Create subscription done for stream: #{stream}" }
end
exchange_hello() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 73
def exchange_hello
  send_hello
  receive_hello
end
filter_and_send_notification(matched_streams, event_xml) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 299
def filter_and_send_notification matched_streams, event_xml
  unless (matched_streams & @subscribed_streams.keys).empty?
    #event_e = filter(event_xml)
    event_e = event_xml
    send_notification event_e
  end
end
initialize_sender_and_receiver() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 122
def initialize_sender_and_receiver
  base_capability = @capabilities.list_loadable.select{ |c| /^urn:ietf:params:netconf:base:\d+\.\d+$/ =~ c }.sort.last
  log_info { "Base NETCONF capability: #{base_capability}" }
  @sender   = Capability[base_capability]::Sender.new   @io_w, logger: logger
  @receiver = Capability[base_capability]::Receiver.new @io_r, logger: logger
end
lock(target) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 177
def lock target
  @server.lock target, @session_id
end
negotiate_capabilities() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 113
def negotiate_capabilities
  @capabilities = @local_capabilities.negotiate @remote_capabilities
  log_info { "Negotiated capabilities: #{@capabilities.list_loadable}" }
  unless @capabilities.list_loadable.any?{ |c| /^urn:ietf:params:netconf:base:\d+\.\d+$/ =~ c }
    log_error { "No base NETCONF capability negotiated" }
    raise  "No base NETCONF capability negotiated"
  end
end
notification_replay(stream, start_time, stop_time, events) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 212
def notification_replay stream, start_time, stop_time, events
  if @server.notification_stream_support_replay? stream
    start_notification_replay_thread stream, start_time, stop_time, events
  else
    log_error { "Notification replay is not supported by stream: #{stream}" }
    raise Error['operation-failed'].new('protocol', 'error', logger: logger)
  end
end
operation_loop() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 129
def operation_loop
  datastore_session = @datastore.new_session self
  operation = Operation.new self, @capabilities, datastore_session, @strict_capabilities, @enable_filter, logger: logger

  begin
    loop do
      break if closed?
      begin
        received_message = @receiver.receive_message
        break unless received_message
      rescue Error => e
        rpc_reply_e = REXML::Element.new("rpc-reply")
        rpc_reply_e.add_namespace("urn:ietf:params:xml:ns:netconf:base:1.0")
        rpc_reply_e.add e.to_rpc_error
      rescue => e
        log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
        raise
      end
      @monitor.synchronize do
        begin
          rpc_reply_e = operation.run received_message
        rescue Error => e
          rpc_reply_e = received_message.clone
          rpc_reply_e.name = "rpc-reply"
          rpc_reply_e.add e.to_rpc_error
        rescue => e
          log_error { [e.backtrace[0], ": ", e.message, " (", e.class.to_s, ")\n\t", e.backtrace[1..-1].join("\n\t")].join }
          raise
        ensure
          begin
            @sender.send_message rpc_reply_e if rpc_reply_e
          rescue IOError
            break
          end
        end
      end
    end
  ensure
    datastore_session.close rescue nil
    @io_w.close_write rescue nil
  end
  log_info { "Exit operation_loop" }
end
receive_hello() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 99
def receive_hello
  buf = String.new
  loop do
    buf += @io_r.read(1)
    if buf[-6..-1] == ']]>]]>'
      break
    end
  end
  log_debug { "Received hello message: #{buf[0..-7].inspect}" }
  remote_capabilities_xml_doc = REXML::Document.new(buf[0..-7], {:ignore_whitespace_nodes => :all})
  remote_capabilities_xml_doc.each_element('/hello/capabilities/capability'){ |c| @remote_capabilities.push c.text }
  log_info { "Remote capabilities: #{@remote_capabilities}" }
end
send_hello() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 78
def send_hello
  log_info { "Local capabilities: #{@local_capabilities}" }
  xml_doc = REXML::Document.new
  hello_e = xml_doc.add_element 'hello'
  hello_e.add_namespace('urn:ietf:params:xml:ns:netconf:base:1.0')
  capabilities_e = hello_e.add_element 'capabilities'
  @local_capabilities.list_loadable.each{ |c|
    capability_e = capabilities_e.add_element 'capability'
    capability_e.text = c
  }
  session_id_e = hello_e.add_element 'session-id'
  session_id_e.text = @session_id.to_s

  buf = String.new
  formatter = REXML::Formatters::Pretty.new(2)
  formatter.compact = true
  formatter.write(xml_doc, buf)
  log_debug { "Sending hello message: #{buf.inspect}" }
  @io_w.write "#{buf}\n]]>]]>"
end
send_notification(event_e) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 284
def send_notification event_e
  @monitor.synchronize do
    notif_e = REXML::Element.new("notification")
    notif_e.add_namespace("urn:ietf:params:xml:ns:netconf:notification:1.0")
    event_e.elements.each{ |e|
      notif_e.add e.deep_clone
    }
    begin
      @sender.send_message notif_e
    rescue IOError => e
      log_warn { "Failed sending notification: #{e.message}" }
    end
  end
end
send_notification_complete(stream) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 315
def send_notification_complete stream
  event_xml = HrrRbRelaxedXML::Document.new
  event_time_e = event_xml.add_element('eventTime')
  event_time_e.text = DateTime.now.rfc3339
  event_e = event_xml.add_element("notificationComplete")
  send_notification event_xml
end
send_replay_complete(stream) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 307
def send_replay_complete stream
  event_xml = HrrRbRelaxedXML::Document.new
  event_time_e = event_xml.add_element('eventTime')
  event_time_e.text = DateTime.now.rfc3339
  event_e = event_xml.add_element("replayComplete")
  send_notification event_xml
end
start() click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 47
def start
  begin
    exchange_hello
    negotiate_capabilities
    initialize_sender_and_receiver
    operation_loop
  rescue
    raise
  ensure
    close
  end
end
start_notification_replay_thread(stream, start_time, stop_time, events) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 225
def start_notification_replay_thread stream, start_time, stop_time, events
  @notification_replay_thread = Thread.new do
    log_info { "Notification replay start for stream: #{stream}" }
    begin
      @monitor.synchronize do
        unless events.respond_to? :each
          log_error { "Argument `events' doesn't respond to :each method: #{events}" }
        else
          begin
            events.each{ |arg1, arg2|
              event_xml = NotificationEvent.new(arg1, arg2).to_xml
              if @server.event_match_stream? event_xml, stream
                if start_time
                  event_time = DateTime.rfc3339(event_xml.elements['eventTime'].text)
                  if start_time < event_time
                    if stop_time.nil? || event_time < stop_time
                      send_notification event_xml
                    end
                  end
                end
              end
            }
          rescue => e
            log_error { "Got an exception during processing replay: #{e.message}" }
          end
        end
        send_replay_complete stream
      end
    ensure
      log_info { "Notification replay completed for stream: #{stream}" }
    end
  end
end
start_notification_termination_thread(stream, start_time, stop_time) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 259
def start_notification_termination_thread stream, start_time, stop_time
  @notification_termination_thread = Thread.new do
    log_info { "Notification termination thread start for stream: #{stream}" }
    begin
      loop do
        now = DateTime.now
        if now.to_time < stop_time.to_time
          sleep_time = ((stop_time.to_time - now.to_time) / 2.0).ceil
          log_debug { "Notification termination thread for stream: #{stream}: sleep [sec]: #{sleep_time}" }
          sleep sleep_time
        else
          log_info { "Notification termination thread terminates subscription for stream: #{stream}" }
          @monitor.synchronize do
            terminate_subscription stream
            send_notification_complete stream
          end
          break
        end
      end
    ensure
      log_info { "Notification termination completed for stream: #{stream}" }
    end
  end
end
stream_subscribed?(stream) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 221
def stream_subscribed? stream
  @subscribed_streams.has_key? stream
end
subscription_creatable?(stream) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 185
def subscription_creatable? stream
  log_info { "Check subscription for stream: #{stream}" }
  if ! @server.has_notification_stream? stream
    log_error { "Server doesn't have notification stream: #{stream}" }
    false
  elsif @subscribed_streams.has_key? stream
    log_error { "Session already has subscription for stream: #{stream}" }
    false
  else
    log_info { "Subscription creatable for stream: #{stream}" }
    true
  end
end
terminate_subscription(stream) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 206
def terminate_subscription stream
  log_info { "Terminate subscription for stream: #{stream}" }
  @subscribed_streams.delete stream
  log_info { "Terminate subscription done for stream: #{stream}" }
end
unlock(target) click to toggle source
# File lib/hrr_rb_netconf/server/session.rb, line 181
def unlock target
  @server.unlock target, @session_id
end