class Apollo::Planner::SmartPlanner
Attributes
amqp[RW]
declarations[RW]
mongo[RW]
Public Class Methods
new(amqp=nil, mongo=nil, opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 38 def initialize(amqp=nil, mongo=nil, opts={}) self.amqp = amqp self.mongo = mongo # Declarations channel = amqp.create_channel # channel.prefetch(1) self.declarations = Apollo::Agent.declare_entities(channel, opts) # Bindings declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, metadata, payload| msg = JSON.parse(payload) puts "#{msg.inspect}" if opts[:verbose] puts "REQ: #{msg['request']}" if opts[:verbose] puts "RESP: #{msg['response']}" if opts[:verbose] request = msg['request'] response = msg['response'] doc = Apollo::Model::QueuedUrl.find(request["_id"]) doc.update_attributes(msg['request']) doc.state = :fetched doc.save doc = Apollo::Model::RawDocument.where(:url => request['url']).first if doc if doc.sha_hash != response['sha_hash'] puts "Removing old cached version of '#{request['url']}'" if opts[:verbose] doc.destroy doc = nil else puts "Using cached version of '#{request['url']}'" if opts[:verbose] end else doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first if(doc.nil? == false) puts "Same as #{doc.inspect}" end end if(doc.nil?) doc = Apollo::Model::RawDocument.new(response).save # Publish declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled") end end declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, metadata, payload| msg = JSON.parse(payload) puts "DOMAINED !!!" end declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, metadata, payload| msg = JSON.parse(payload) # puts "Crawled - #{msg.inspect}" request = msg['request'] response = msg['response'] data = msg['data'] links = msg['links'] links = [] if links.nil? data_hash = Digest::SHA256.new.update(data).hexdigest puts "#{data_hash}" links.each do |url| link = url['link'] Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name']) end # puts JSON.pretty_generate(data) # puts JSON.pretty_generate(links) end end
Public Instance Methods
fetch_queued_urls(opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 138 def fetch_queued_urls(opts={}) fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count if(fetching_count > 4) puts "Fetching too many URLs. Slowing down for a while ..." return end while get_url_count(:fetching) < 4 url = get_next_url(opts) puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}" fetch_url(url, opts) end end
fetch_url(url, opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 123 def fetch_url(url, opts={}) if(opts[:verbose]) puts "AMQP fetching '#{url.inspect}'" end # Publish declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched") end
get_next_url(opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 132 def get_next_url(opts={}) tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc) res = tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true) return res end
get_url_count(state, opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 119 def get_url_count(state, opts={}) Apollo::Model::QueuedUrl.where({:state => state}).count end
run(opts={})
click to toggle source
# File lib/apollo_crawler/planner/smart_planner.rb, line 153 def run(opts={}) request_exit = false while request_exit == false fetch_queued_urls(opts) sleep 1 end return 0 end