class Object
Public Instance Methods
do_notifyDemand(name,arg,msg)
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 34 def do_notifyDemand(name,arg,msg) ts = Google::Protobuf::Timestamp.new p ts ts.from_time(Time.now) cdata = Api::Content.new(entity: msg) req = Api::Demand.new(id: generateIntID(), sender_id: $clientID, target_id: 0, channel_type: 7, demand_name: name, ts: ts, arg_json: arg, mbus_id: 0, cdata: cdata) # GRPC.logger.info("NotifyDemand Call #{req.inspect}") resp = $sxstub.notify_demand(req) GRPC.logger.info("NotifyDemand Response #{resp.inspect}") end
do_notifySupply(name,arg,msg)
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 57 def do_notifySupply(name,arg,msg) ts = Google::Protobuf::Timestamp.new # $log.puts("Do notify Supply:"+ts.to_s+":"+$clientID.to_s) ts.from_time(Time.now) cdata = Api::Content.new(entity: msg) req = Api::Supply.new(id: generateIntID(), sender_id: $clientID, target_id: 0, channel_type: 7, # for fluentd supply_name: name, ts: ts, arg_json: arg, mbus_id: 0, cdata: cdata) # GRPC.logger.info("NotifySupply Call #{req.inspect}") resp = $sxstub.notify_supply(req) GRPC.logger.info("NotifySupply Response #{resp.inspect}") end
do_registerNode(stub,name)
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 106 def do_registerNode(stub,name) GRPC.logger.info("Register Node") req = Nodeapi::NodeInfo.new(node_name: name, node_type: Nodeapi::NodeType::PROVIDER, server_info: "", node_pbase_version: "0.1.2", with_node_id: -1, cluster_id: 0, area_id: "Default", channelTypes: [7]) #fluentd resp = stub.register_node(req) GRPC.logger.info("RN:Answer: #{resp.inspect}") return resp end
generateIntID()
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 122 def generateIntID() return $idgen.next_id end
keepAlive()
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 83 def keepAlive() while $nodeInfo['secret'] != 0 do sleep( $nodeInfo['keepalive_duration']) $updateCount += 1 req = Nodeapi::NodeUpdate.new(node_id: $nodeInfo['node_id'], secret: $nodeInfo['secret'], update_count: $updateCount, node_status: 0, node_arg: $status) resp = $nodestub.keep_alive(req) GRPC.logger.info("Response for #{$nodeInfo['node_id']}, #{resp.inspect}") end end
registerServ(nodesv,name)
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 126 def registerServ(nodesv,name) # connect to nodeserv $nodestub = Nodeapi::Node::Stub.new(nodesv, :this_channel_is_insecure, timeout: INFINITE_FUTURE) GRPC.logger.info(".. connecting insecurely on nodeserv #{nodesv}") $nodeInfo = do_registerNode($nodestub,name) # got server info service_epoch = Time.new(2010, 11, 4, 1, 42, 54).strftime('%s%L').to_i # may Twitter epoch of snowflake, # 1288834974657 $idgen = AnyFlake.new(service_epoch, $nodeInfo.node_id) $clientID = generateIntID() GRPC.logger.info(".. connecting insecurely on synerex server to #{$nodeInfo['server_info']}") $sxstub = Api::Synerex::Stub.new($nodeInfo.server_info, :this_channel_is_insecure, timeout: INFINITE_FUTURE) startKeepAlive end
startKeepAlive()
click to toggle source
# File lib/fluent/plugin/provider-util.rb, line 100 def startKeepAlive() $threads << Thread.new { keepAlive() } end