class RubyKpi::KP
The KP
class
Attributes
active_subscriptions[R]
accessors
ip[R]
accessors
last_reply[R]
accessors
last_request[R]
accessors
node_id[R]
accessors
port[R]
accessors
ss[R]
accessors
Public Class Methods
new(ip, port, smart_space, debug = false)
click to toggle source
constructor
# File lib/ruby_kpi.rb, line 87 def initialize(ip, port, smart_space, debug = false) # debug object @debug = debug if @debug @logger = Logger.new(STDOUT) @logger.debug("KP:initialize") end # instance variables @ip = ip @port = port @ss = smart_space @transaction_id = 1 @node_id = UUID.new().generate() @last_request = nil @last_reply = nil @active_subscriptions = {} end
Public Instance Methods
insert(triple_list)
click to toggle source
INSERT
# File lib/ruby_kpi.rb, line 204 def insert(triple_list) # debug print if @debug @logger.debug("KP:insert") end # build the triple_string triple_string = "" triple_list.each do |triple| triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] end # build the SSAP INSERT REQUEST msg = INSERT_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ] # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # storing last request and last reply @last_request = msg @last_reply = rmsg # read the reply r = ReplyMessage.new(rmsg) # increment transaction id @transaction_id += 1 # return return r.success?() end
join_ss()
click to toggle source
join
# File lib/ruby_kpi.rb, line 115 def join_ss() # debug print if @debug @logger.debug("KP:join_ss") end # building and storing the SSAP JOIN REQUEST msg = JOIN_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sending the request message tcpc.send_request(msg) # waiting for a reply rmsg = tcpc.receive_reply() # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # reading message r = ReplyMessage.new(rmsg) # increment transaction id @transaction_id += 1 # return return r.success?() end
leave_ss()
click to toggle source
LEAVE
# File lib/ruby_kpi.rb, line 159 def leave_ss() # debug print if @debug @logger.debug("KP:leave_ss") end # building and storing the SSAP LEAVE REQUEST msg = LEAVE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() ## instantiate a new ReplyMessage r = ReplyMessage.new(rmsg) # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # increment transaction id @transaction_id += 1 # return return r.success?() end
rdf_indication_receiver(tcpc, subscription_id, handler)
click to toggle source
rdf indication receiver
# File lib/ruby_kpi.rb, line 628 def rdf_indication_receiver(tcpc, subscription_id, handler) # debug print if @debug @logger.debug("KP:rdf_indication_receiver") end # Endless loop while true # receive rmsg = tcpc.receive_reply() r = ReplyMessage.new(rmsg) # is it an indication? if r.get_message_type() == "INDICATION" # debug print if @debug @logger.debug("KP:rdf_indication_receiver -- INDICATION") end # call the handler added, removed = r.get_rdf_triples_from_indication() handler.handle(added, removed) # it is an unsubscribe confirm else # close subscription if r.success?() # debug print if @debug @logger.debug("KP:rdf_indication_receiver -- UNSUBSCRIBE CONFIRM") end # save the reply @last_reply = rmsg # close subscription @active_subscriptions[subscription_id]["socket"].close() t = @active_subscriptions[subscription_id]["thread"] @active_subscriptions.delete(subscription_id) # return r = ReplyMessage.new(rmsg) return r.success?() end end end end
rdf_query(triple)
click to toggle source
RDF QUERY
# File lib/ruby_kpi.rb, line 376 def rdf_query(triple) # debug print if @debug @logger.debug("KP:rdf_query") end # build the triple triple_string = TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] # build and store the SSAP QUERY REQUEST msg = RDF_QUERY_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # storing and reading the reply @last_reply = rmsg r = ReplyMessage.new(rmsg) # closing the socket tcpc.close() # increment transaction id @transaction_id += 1 # return return r.success?(), r.get_rdf_triples() end
rdf_subscribe(triple, myHandlerClass)
click to toggle source
RDF Subscription
# File lib/ruby_kpi.rb, line 470 def rdf_subscribe(triple, myHandlerClass) # debug print if @debug @logger.debug("KP:rdf_subscribe") end # build the triple triple_string = TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] # build and store the SSAP QUERY REQUEST msg = RDF_SUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # storing and reading the last reply @last_reply = rmsg r = ReplyMessage.new(rmsg) # increment transaction id @transaction_id += 1 # get the subscription id subscription_id = r.get_subscription_id() # Get the initial results triple_list = r.get_rdf_triples() # instantiate the handler class h = myHandlerClass.new() # start the thread t = Thread.new{rdf_indication_receiver(tcpc, subscription_id, h)} # store the subscription id, its socket and its thread @active_subscriptions[subscription_id] = {} @active_subscriptions[subscription_id]["socket"] = tcpc @active_subscriptions[subscription_id]["thread"] = t # return return r.success?(), subscription_id, triple_list end
remove(triple_list)
click to toggle source
REMOVE
# File lib/ruby_kpi.rb, line 261 def remove(triple_list) # debug print if @debug @logger.debug("KP:remove") end # build the triple string triple_string = "" triple_list.each do |triple| triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] end # building and storing the SSAP REMOVE REQUEST msg = REMOVE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, triple_string ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # increment transaction id @transaction_id += 1 # parsing the message to get the return value r = ReplyMessage.new(rmsg) return r.success?() end
sparql_indication_receiver(tcpc, subscription_id, handler)
click to toggle source
sparql indication receiver
# File lib/ruby_kpi.rb, line 689 def sparql_indication_receiver(tcpc, subscription_id, handler) # debug print if @debug @logger.debug("KP:sparql_indication_receiver") end # Endless loop while true # receive rmsg = tcpc.receive_reply() r = ReplyMessage.new(rmsg) # parse the message content = XML::Parser.string(rmsg).parse # is it an indication? if r.get_message_type() == "INDICATION" # debug print if @debug @logger.debug("KP:sparql_indication_receiver -- INDICATION") end # extract triples from the indication and launch the handler added, removed = r.get_sparql_results_from_indication() handler.handle(added, removed) # it is an unsubscribe confirm else # close subscription if r.success?() # debug print if @debug @logger.debug("KP:sparql_indication_receiver -- UNSUBSCRIBE CONFIRM") end # save the reply @last_reply = rmsg # close subscription @active_subscriptions[subscription_id]["socket"].close() t = @active_subscriptions[subscription_id]["thread"] @active_subscriptions.delete(subscription_id) # return r = ReplyMessage.new(rmsg) return r.success?() end end end end
sparql_query(q)
click to toggle source
SPARQL QUERY
# File lib/ruby_kpi.rb, line 425 def sparql_query(q) # debug print if @debug @logger.debug("KP:sparql_query") end # build and store the SSAP SPARQL QUERY REQUEST q = q.gsub("<", "<").gsub(">", ">") msg = SPARQL_QUERY_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, q ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # increment transaction id @transaction_id += 1 # reading the reply r = ReplyMessage.new(rmsg) # return return r.success?(), r.get_sparql_results() end
sparql_subscribe(pattern, myHandlerClass = nil)
click to toggle source
SPARQL Subscription
# File lib/ruby_kpi.rb, line 533 def sparql_subscribe(pattern, myHandlerClass = nil) # debug print if @debug @logger.debug("KP:rdf_subscribe") end # build and store the SSAP QUERY REQUEST msg = SPARQL_SUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, pattern ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # storing and reading the last reply @last_reply = rmsg r = ReplyMessage.new(rmsg) # increment transaction id @transaction_id += 1 # get the subscription id subscription_id = r.get_subscription_id() # Get the initial results initial_results = r.get_sparql_results() # start the indication receiver if myHandlerClass # TODO check if myHandlerClass is a Handler, # otherwise raise an exception h = myHandlerClass.new() else h = nil end t = Thread.new{sparql_indication_receiver(tcpc, subscription_id, h)} # store the subscription id and its socket @active_subscriptions[subscription_id] = {} @active_subscriptions[subscription_id]["socket"] = tcpc @active_subscriptions[subscription_id]["thread"] = t # return return r.success?(), subscription_id, initial_results end
unsubscribe(sub_id)
click to toggle source
unsubscribe
# File lib/ruby_kpi.rb, line 593 def unsubscribe(sub_id) # debug print if @debug @logger.debug("KP:unsubscribe") end # building and storing the SSAP UNSUBSCRIBE REQUEST msg = UNSUBSCRIBE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, sub_id ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sending the request message tcpc.send_request(msg) # closing the socket tcpc.close() # increment transaction id @transaction_id += 1 # get the thread and return return @active_subscriptions[sub_id]["thread"].value end
update(new_triple_list, old_triple_list)
click to toggle source
UPDATE
# File lib/ruby_kpi.rb, line 313 def update(new_triple_list, old_triple_list) # debug print if @debug @logger.debug("KP:update") end # build the triple strings old_triple_string = "" old_triple_list.each do |triple| old_triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] end new_triple_string = "" new_triple_list.each do |triple| new_triple_string += TRIPLE_TEMPLATE % [triple.subject.class.to_s.split("::").last.downcase, triple.subject.value, triple.predicate.value, triple.object.class.to_s.split("::").last.downcase, triple.object.value] end # building and storing the SSAP REMOVE REQUEST msg = UPDATE_REQUEST_TEMPLATE % [ @node_id, @ss, @transaction_id, new_triple_string, old_triple_string ] @last_request = msg # connecting to the SIB tcpc = TCPConnection.new(@ip, @port) # sendind the request tcpc.send_request(msg) # waiting a reply rmsg = tcpc.receive_reply() # closing the socket tcpc.close() # storing last reply @last_reply = rmsg # increment transaction id @transaction_id += 1 # parsing the message to get the return value r = ReplyMessage.new(rmsg) return r.success?() end