class Apollo::Agent::FetcherAgent

Constants

THREAD_POOL_SIZE

Attributes

declarations[RW]
fetcher[RW]
mutex[RW]

Public Class Methods

new(amqp, opts={}) click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 41
def initialize(amqp, opts={})
        self.fetcher = Apollo::Fetcher::SmartFetcher.new

        if(opts[:verbose])
                puts "Initializing fetcher agent..."
        end

        # Declarations
        channel = amqp.create_channel
        channel.prefetch(THREAD_POOL_SIZE)
        
        # Binding (Default)
        self.declarations = Apollo::Agent.declare_entities(channel, opts)
        queue = declarations[:queues]["fetcher.queue"]

        # AMQP contexts for threads
        contexts = []
        (0...THREAD_POOL_SIZE).each do |i|
                puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose]
        end

        # AMQP contexts mutex/lock
        self.mutex = Mutex.new()

        exchange = self.declarations[:exchanges]["fetcher"]

        queue.bind(exchange).subscribe(:ack => true) do |delivery_info, metadata, payload|          
                # There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE
                queued_url = JSON.parse(payload)
                url = queued_url["url"]

                puts "FetcherAgent: Received - '#{url}', metadata #{metadata.inspect}" if opts[:verbose]
                self.mutex.synchronize {
                        puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose]
                        channel.basic_ack(delivery_info.delivery_tag, true)
                }

                begin
                        doc = Apollo::Fetcher::SmartFetcher::fetch(url)
                        doc = process_fetched_doc(queued_url, doc, metadata, opts)
                        
                        if(metadata && metadata[:reply_to])
                                puts "Replying to '#{metadata[:reply_to]}'"
                                send_response_msg(metadata[:reply_to], queued_url, doc)
                        end

                rescue Exception => e
                        puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'"
                end

                doc
        end
end

Public Instance Methods

format_response_msg(queued_url, doc) click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 108
def format_response_msg(queued_url, doc)
        return {
                :request => queued_url,
                :response => doc
        }
end
process_fetched_doc(queued_url, doc, metadata, opts={}) click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 95
def process_fetched_doc(queued_url, doc, metadata, opts={})
        url = queued_url["url"]

        res = Apollo::Model::RawDocument.new
        res.headers = doc[:headers]
        res.body = doc[:body]
        res.sha_hash = Digest::SHA1.hexdigest(doc[:body])
        res.status = doc[:status]
        res.url = url

        return res
end
send_response_msg(dest, queued_url, doc) click to toggle source
# File lib/apollo_crawler/agent/fetcher_agent.rb, line 115
def send_response_msg(dest, queued_url, doc)
        if(dest != nil)                                                     
                msg = format_response_msg(queued_url, doc)

                self.mutex.synchronize {
                        exchange = self.declarations[:exchanges][dest]
                        exchange.publish(msg.to_json)
                }
        end
end