class RubyKpi::KP

The KP class

Attributes

active_subscriptions[R]

accessors

ip[R]

accessors

last_reply[R]

accessors

last_request[R]

accessors

node_id[R]

accessors

port[R]

accessors

ss[R]

accessors

Public Class Methods

new(ip, port, smart_space, debug = false) click to toggle source

constructor

# File lib/ruby_kpi.rb, line 87
def initialize(ip, port, smart_space, debug = false)
  
  # debug object
  @debug = debug
  if @debug       
    @logger = Logger.new(STDOUT)
    @logger.debug("KP:initialize")
  end

  # instance variables
  @ip = ip
  @port = port
  @ss = smart_space
  @transaction_id = 1
  @node_id = UUID.new().generate() 
  @last_request = nil
  @last_reply = nil
  @active_subscriptions = {}

end

Public Instance Methods

insert(triple_list) click to toggle source

INSERT

# File lib/ruby_kpi.rb, line 204
def insert(triple_list)

  # debug print
  if @debug
    @logger.debug("KP:insert")
  end

  # build the triple_string
  triple_string = ""
  triple_list.each do |triple|
    triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                        triple.subject.value, 
                                        triple.predicate.value, 
                                        triple.object.class.to_s.split("::").last.downcase, 
                                        triple.object.value]
  end

  # build the SSAP INSERT REQUEST
  msg = INSERT_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ]

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # storing last request and last reply
  @last_request = msg
  @last_reply = rmsg

  # read the reply
  r = ReplyMessage.new(rmsg)

  # increment transaction id
  @transaction_id += 1

  # return
  return r.success?()
  
end
join_ss() click to toggle source

join

# File lib/ruby_kpi.rb, line 115
def join_ss()

  # debug print
  if @debug
    @logger.debug("KP:join_ss")
  end

  # building and storing the SSAP JOIN REQUEST
  msg = JOIN_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sending the request  message
  tcpc.send_request(msg)

  # waiting for a reply
  rmsg = tcpc.receive_reply()

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # reading message
  r = ReplyMessage.new(rmsg)

  # increment transaction id
  @transaction_id += 1

  # return
  return r.success?()
  
end
leave_ss() click to toggle source

LEAVE

# File lib/ruby_kpi.rb, line 159
def leave_ss()

  # debug print
  if @debug
    @logger.debug("KP:leave_ss")
  end

  # building and storing the SSAP LEAVE REQUEST
  msg = LEAVE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  ## instantiate a new ReplyMessage
  r = ReplyMessage.new(rmsg)

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # increment transaction id
  @transaction_id += 1

  # return
  return r.success?()
  
end
rdf_indication_receiver(tcpc, subscription_id, handler) click to toggle source

rdf indication receiver

# File lib/ruby_kpi.rb, line 628
def rdf_indication_receiver(tcpc, subscription_id, handler)

  # debug print
  if @debug
    @logger.debug("KP:rdf_indication_receiver")
  end

  # Endless loop
  while true
    
    # receive
    rmsg = tcpc.receive_reply()
    r = ReplyMessage.new(rmsg)

    # is it an indication?
    if r.get_message_type() == "INDICATION"

      # debug print
      if @debug
        @logger.debug("KP:rdf_indication_receiver -- INDICATION")
      end

      # call the handler
      added, removed = r.get_rdf_triples_from_indication()
      handler.handle(added, removed)          
      
      # it is an unsubscribe confirm
    else

      # close subscription
      if r.success?()

        # debug print
        if @debug
          @logger.debug("KP:rdf_indication_receiver -- UNSUBSCRIBE CONFIRM")
        end

        # save the reply
        @last_reply = rmsg
        
        # close subscription
        @active_subscriptions[subscription_id]["socket"].close()
        t = @active_subscriptions[subscription_id]["thread"]
        @active_subscriptions.delete(subscription_id)
        
        # return
        r = ReplyMessage.new(rmsg)
        return r.success?()
        
      end
    end 
  end    
end
rdf_query(triple) click to toggle source

RDF QUERY

# File lib/ruby_kpi.rb, line 376
def rdf_query(triple)

  # debug print
  if @debug
    @logger.debug("KP:rdf_query")
  end

  # build the triple
  triple_string = TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                     triple.subject.value, 
                                     triple.predicate.value, 
                                     triple.object.class.to_s.split("::").last.downcase, 
                                     triple.object.value]
  
  # build and store the SSAP QUERY REQUEST
  msg = RDF_QUERY_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # storing and reading the reply
  @last_reply = rmsg
  r = ReplyMessage.new(rmsg)
  
  # closing the socket
  tcpc.close()

  # increment transaction id
  @transaction_id += 1

  # return
  return r.success?(), r.get_rdf_triples()

end
rdf_subscribe(triple, myHandlerClass) click to toggle source

RDF Subscription

# File lib/ruby_kpi.rb, line 470
def rdf_subscribe(triple, myHandlerClass)

  # debug print
  if @debug
    @logger.debug("KP:rdf_subscribe")
  end

  # build the triple
  triple_string = TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                     triple.subject.value, 
                                     triple.predicate.value, 
                                     triple.object.class.to_s.split("::").last.downcase, 
                                     triple.object.value]

  # build and store the SSAP QUERY REQUEST
  msg = RDF_SUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # storing and reading the last reply
  @last_reply = rmsg
  r = ReplyMessage.new(rmsg)

  # increment transaction id
  @transaction_id += 1

  # get the subscription id
  subscription_id = r.get_subscription_id()

  # Get the initial results
  triple_list = r.get_rdf_triples()

  # instantiate the handler class
  h = myHandlerClass.new()

  # start the thread
  t = Thread.new{rdf_indication_receiver(tcpc, subscription_id, h)}    

  # store the subscription id, its socket and its thread
  @active_subscriptions[subscription_id] = {} 
  @active_subscriptions[subscription_id]["socket"] = tcpc
  @active_subscriptions[subscription_id]["thread"] = t

  # return
  return r.success?(), subscription_id, triple_list
  
end
remove(triple_list) click to toggle source

REMOVE

# File lib/ruby_kpi.rb, line 261
def remove(triple_list)

  # debug print
  if @debug
    @logger.debug("KP:remove")
  end

  # build the triple string
  triple_string = ""
  triple_list.each do |triple|
    triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                        triple.subject.value, 
                                        triple.predicate.value, 
                                        triple.object.class.to_s.split("::").last.downcase, 
                                        triple.object.value]
  end

  # building and storing the SSAP REMOVE REQUEST
  msg = REMOVE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # increment transaction id
  @transaction_id += 1

  # parsing the message to get the return value
  r = ReplyMessage.new(rmsg)
  return r.success?()
  
end
sparql_indication_receiver(tcpc, subscription_id, handler) click to toggle source

sparql indication receiver

# File lib/ruby_kpi.rb, line 689
def sparql_indication_receiver(tcpc, subscription_id, handler)

  # debug print
  if @debug
    @logger.debug("KP:sparql_indication_receiver")
  end

  # Endless loop
  while true
    
    # receive
    rmsg = tcpc.receive_reply()
    r = ReplyMessage.new(rmsg)

    # parse the message
    content = XML::Parser.string(rmsg).parse
    
    # is it an indication?
    if r.get_message_type() == "INDICATION"

      # debug print
      if @debug
        @logger.debug("KP:sparql_indication_receiver -- INDICATION")
      end
      
      # extract triples from the indication and launch the handler
      added, removed = r.get_sparql_results_from_indication()
      handler.handle(added, removed)          
      
      # it is an unsubscribe confirm
    else

      # close subscription
      if r.success?()

        # debug print
        if @debug
          @logger.debug("KP:sparql_indication_receiver -- UNSUBSCRIBE CONFIRM")
        end
        
        # save the reply
        @last_reply = rmsg
        
        # close subscription
        @active_subscriptions[subscription_id]["socket"].close()
        t = @active_subscriptions[subscription_id]["thread"]
        @active_subscriptions.delete(subscription_id)

        # return
        r = ReplyMessage.new(rmsg)
        return r.success?()
        
      end
    end 
  end    
end
sparql_query(q) click to toggle source

SPARQL QUERY

# File lib/ruby_kpi.rb, line 425
def sparql_query(q)

  # debug print
  if @debug
    @logger.debug("KP:sparql_query")
  end

  # build and store the SSAP SPARQL QUERY REQUEST
  q = q.gsub("<", "&lt;").gsub(">", "&gt;")
  msg = SPARQL_QUERY_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, q ]
  @last_request = msg
  
  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # increment transaction id
  @transaction_id += 1
  
  # reading the reply
  r = ReplyMessage.new(rmsg)

  # return
  return r.success?(), r.get_sparql_results()
  
end
sparql_subscribe(pattern, myHandlerClass = nil) click to toggle source

SPARQL Subscription

# File lib/ruby_kpi.rb, line 533
def sparql_subscribe(pattern, myHandlerClass = nil)

  # debug print
  if @debug
    @logger.debug("KP:rdf_subscribe")
  end

  # build and store the SSAP QUERY REQUEST
  msg = SPARQL_SUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, pattern ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # storing and reading the last reply
  @last_reply = rmsg
  r = ReplyMessage.new(rmsg)

  # increment transaction id
  @transaction_id += 1

  # get the subscription id
  subscription_id = r.get_subscription_id()

  # Get the initial results
  initial_results = r.get_sparql_results()

  # start the indication receiver
  if myHandlerClass
    # TODO check if myHandlerClass is a Handler,
    # otherwise raise an exception
    h = myHandlerClass.new()
  else
    h = nil
  end
  t = Thread.new{sparql_indication_receiver(tcpc, subscription_id, h)}    

  # store the subscription id and its socket
  @active_subscriptions[subscription_id] = {} 
  @active_subscriptions[subscription_id]["socket"] = tcpc
  @active_subscriptions[subscription_id]["thread"] = t

  # return
  return r.success?(), subscription_id, initial_results
  
end
unsubscribe(sub_id) click to toggle source

unsubscribe

# File lib/ruby_kpi.rb, line 593
def unsubscribe(sub_id)

  # debug print
  if @debug
    @logger.debug("KP:unsubscribe")
  end

  # building and storing the SSAP UNSUBSCRIBE REQUEST
  msg = UNSUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, sub_id ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sending the request  message
  tcpc.send_request(msg)

  # closing the socket
  tcpc.close()

  # increment transaction id
  @transaction_id += 1
  
  # get the thread and return
  return @active_subscriptions[sub_id]["thread"].value

end
update(new_triple_list, old_triple_list) click to toggle source

UPDATE

# File lib/ruby_kpi.rb, line 313
def update(new_triple_list, old_triple_list)

  # debug print
  if @debug
    @logger.debug("KP:update")
  end

  # build the triple strings
  old_triple_string = ""
  old_triple_list.each do |triple|

    old_triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                            triple.subject.value, 
                                            triple.predicate.value, 
                                            triple.object.class.to_s.split("::").last.downcase, 
                                            triple.object.value]
    
  end
  new_triple_string = ""
  new_triple_list.each do |triple|

    new_triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, 
                                            triple.subject.value, 
                                            triple.predicate.value, 
                                            triple.object.class.to_s.split("::").last.downcase, 
                                            triple.object.value]
  end

  # building and storing the SSAP REMOVE REQUEST
  msg = UPDATE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, new_triple_string, old_triple_string ]
  @last_request = msg

  # connecting to the SIB
  tcpc = TCPConnection.new(@ip, @port)
  
  # sendind the request
  tcpc.send_request(msg)

  # waiting a reply
  rmsg = tcpc.receive_reply()

  # closing the socket
  tcpc.close()

  # storing last reply
  @last_reply = rmsg

  # increment transaction id
  @transaction_id += 1

  # parsing the message to get the return value
  r = ReplyMessage.new(rmsg)
  return r.success?()
  
end