class NewsCrawler::Storage::URLQueue::MongoEngine
List storage engine with MongoDB backend
Constants
- NAME
Public Class Methods
Construct a queue
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 37 def initialize(*opts) config = SimpleConfig.for :application db = MongoClient.new(config.mongodb.host, config.mongodb.port, pool_size: 4, pool_timeout: 5)[config.mongodb.db_name] coll_name = config.prefix + '_' + config.suffix.url_queue h_opts = ((opts[-1].is_a? Hash) ? opts[-1] : {}) @coll = db[h_opts[:coll_name] || coll_name] @coll.ensure_index({:url => Mongo::ASCENDING}, {:unique => true}) end
Public Instance Methods
Add an URL to list with reference URL @param [ String ] url @param [ String ] ref_url
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 51 def add(url, ref_url = '') if (ref_url == '') depth = 0 else depth = (get_url_depth(ref_url) || 0) + 1 end begin @coll.insert({:url => url, :depth => depth, :visited => false}) rescue Mongo::OperationFailure => e if e.error_code == 11000 # duplicate key error raise DuplicateURLError, url else raise e end end end
Get all URL and status @return [ Array ] array of hash contains url and status
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 113 def all(*opts) @coll.find.collect do | entry | entry.each_key.inject({}) do | memo, key | if key != '_id' memo[key.intern] = entry[key] end memo end end end
Clear URL queue @return [ Fixnum ] number of urls removed
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 202 def clear(*opts) count = @coll.count @coll.remove count end
TODO fix bug - find visited url Find all visited urls with given module process state @param [ String ] modul_name @param [ String ] state one of unprocessed, processing, processed @param [ Fixnum ] max_depth max url depth return (inclusive) @return [ Array ] URL list
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 130 def find_all(modul_name, state, max_depth = -1) if (state == URLQueue::UNPROCESSED) selector = {:$or => [{modul_name => state}, {modul_name => {:$exists => false}}]} else selector = {modul_name => state} end selector = {:$and => [selector, {'visited' => true}]} if max_depth > -1 selector[:$and] << {'depth' => {:$lte => max_depth}} end @coll.find(selector).collect do | entry | entry['url'] end end
Find one visited url with given module process state @param [ String ] modul_name @param [ String ] state one of unprocessed, processing, processed @param [ Fixnum ] max_depth max url depth return (inclusive) @return [ String, nil ] URL or nil if cann’t found url matches criterial
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 152 def find_one(modul_name, state, max_depth = -1) a = find_all(modul_name, state, max_depth) if a.size > 0 a[0] else nil end end
Get list of unvisited URL @param [ Fixnum ] max_depth maximum depth of url return @return [ Array ] unvisited url with maximum depth (option)
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 188 def find_unvisited(max_depth = -1) if max_depth > -1 selector = {:$and => [{'visited' => false}, {'depth' => {:$lte => max_depth}}]} else selector = {'visited' => false} end @coll.find(selector).collect do | entry | entry['url'] end end
Get URL depth of given url @param [ String ] url return [ Fixnum ] URL depth
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 211 def get_url_depth(url) @coll.find_one({'url' => url}, {:fields => ['depth']})['depth'] end
Set processing state of url in given module @param [ String ] module_name @param [ String ] url @param [ String ] state one of unprocessed, processing, processed
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 95 def mark(module_name, url, state) @coll.update({:url => url}, {:$set => {module_name => state}}) end
Change all url in an state to other state @param [ String ] module_name @param [ String ] new_state new state @param [ String ] orig_state original state
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 104 def mark_all(module_name, new_state, orig_state = nil) selector = (orig_state.nil? ? {} : {module_name => orig_state}) @coll.update(selector, {:$set => {module_name => new_state}}, :multi => true) end
Mark all URLs as unvisited
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 78 def mark_all_unvisited @coll.update({}, {:$set => {'visited' => false}}, {:multi => true}) end
Mark an URL as visited @param [ String ] url
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 72 def mark_visited(url) @coll.update({:url => url}, {:$set => {'visited' => true}}) end
Get next unprocessed a url and mark it as processing in atomic @param [ String ] modul_name @param [ Fixnum ] max_depth max url depth return (inclusive) @return [ String, nil ] URL or nil if url doesn’t exists
# File lib/news_crawler/storage/url_queue/mongo_storage.rb, line 165 def next_unprocessed(modul_name, max_depth = -1) selector = {:$or => [{modul_name => URLQueue::UNPROCESSED}, {modul_name => {:$exists => false}}]} selector = {:$and => [selector, {'visited' => true}]} if max_depth > -1 selector[:$and] << {'depth' => {:$lte => max_depth}} end doc = @coll.find_and_modify(:query => selector, :update => {:$set => {modul_name => URLQueue::PROCESSING}}) if doc.nil? nil else doc['url'] end (doc.nil? ? nil : doc['url']) end