class Apollo::Agent::FetcherAgent
Constants
- THREAD_POOL_SIZE
Attributes
declarations[RW]
fetcher[RW]
mutex[RW]
Public Class Methods
new(amqp, opts={})
click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 41 def initialize(amqp, opts={}) self.fetcher = Apollo::Fetcher::SmartFetcher.new if(opts[:verbose]) puts "Initializing fetcher agent..." end # Declarations channel = amqp.create_channel channel.prefetch(THREAD_POOL_SIZE) # Binding (Default) self.declarations = Apollo::Agent.declare_entities(channel, opts) queue = declarations[:queues]["fetcher.queue"] # AMQP contexts for threads contexts = [] (0...THREAD_POOL_SIZE).each do |i| puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose] end # AMQP contexts mutex/lock self.mutex = Mutex.new() exchange = self.declarations[:exchanges]["fetcher"] queue.bind(exchange).subscribe(:ack => true) do |delivery_info, metadata, payload| # There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE queued_url = JSON.parse(payload) url = queued_url["url"] puts "FetcherAgent: Received - '#{url}', metadata #{metadata.inspect}" if opts[:verbose] self.mutex.synchronize { puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose] channel.basic_ack(delivery_info.delivery_tag, true) } begin doc = Apollo::Fetcher::SmartFetcher::fetch(url) doc = process_fetched_doc(queued_url, doc, metadata, opts) if(metadata && metadata[:reply_to]) puts "Replying to '#{metadata[:reply_to]}'" send_response_msg(metadata[:reply_to], queued_url, doc) end rescue Exception => e puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'" end doc end end
Public Instance Methods
format_response_msg(queued_url, doc)
click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 108 def format_response_msg(queued_url, doc) return { :request => queued_url, :response => doc } end
process_fetched_doc(queued_url, doc, metadata, opts={})
click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 95 def process_fetched_doc(queued_url, doc, metadata, opts={}) url = queued_url["url"] res = Apollo::Model::RawDocument.new res.headers = doc[:headers] res.body = doc[:body] res.sha_hash = Digest::SHA1.hexdigest(doc[:body]) res.status = doc[:status] res.url = url return res end
send_response_msg(dest, queued_url, doc)
click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 115 def send_response_msg(dest, queued_url, doc) if(dest != nil) msg = format_response_msg(queued_url, doc) self.mutex.synchronize { exchange = self.declarations[:exchanges][dest] exchange.publish(msg.to_json) } end end