class Datahen::Scraper::BatchParser
Constants
- NOT_FOUND_MSG
- NO_DEQUEUE_COUNT_MSG
- NO_WORKERS_MSG
Attributes
Datahen
job pages client used for API pages dequeuing. @return [Datahen::Client::JobPage] datahen job pages API client
Current config file loaded. @return [Hash] current loaded configuration
Configuration file path. @return [String] config file path
Dequeue interval in seconds. @return [Integer] dequeue interval in seconds
Dequeuer mutext used to synchronize page dequeuing. @return [Mutex] dequeuer mutex
Dequeue scale used to calculate the ideal dequeue size. @return [Numeric] dequeue scale
Dequeue API request timeout in seconds. @return [Integer] dequeue API request timeout in seconds
Dequeuer last run unix timestamp. @return [Integer] dequeuer last run unix timestamp
Current dequeuer thread. @return [Thread] dequeuer thread
Garbage collector request counter. @return [Integer] garbage collector counter
Garbage collector mutex used to synchronize garbage collector requests. @return [Mutex] garbage collector mutex
Job id to be executed. @return [Integer] job id
Last printed message, useful to prevent duplicated log messages. @return [String] last printed message
Loaded pages hash, useful to avoid duplicates on the loaded pages array. @return [Concurrent::Hash<String, Hash>] loaded pages as a concurrent hash
Max garbage collector requests before actually executing the garbage
collector.
@return [Integer] max garbage request quantity before actually executing
it
Indicates whenever the wait time is because there are no more pages. @return [Boolean] `true` when wait time is due to no more pages,
else `false`
Known page types extracted from the config file. @return [Array<String>] known page types
Loaded pages array. @return [Concurrent::Array<Hash>] loaded pages as an array
Known parsers extracted from the config file. @return [Concurrent::Hash<String, String>] known parsers
Second dequeue counter used to prevent false negative warning messages. @return [Integer] second dequeue counter
Parallel worker quantity. @return [Integer] parallel worker quantity
Public Class Methods
Initialize a batch parser object. @param [Integer] job_id
Job id. @param [String] config_file
Config file path. @param [Hash] opts ({}) Configuration options @option opts [Integer] :worker_count (1) Parallel worker quantity. @option opts [Integer] :max_garbage (5) Max amount of times the garbage
collector can be requested before actually executing.
@option opts [Integer] :dequeue_interval (3) Time in seconds to wait
between page dequeuing.
@option opts [Numeric] :dequeue_scale (2) Scaling factor to used to
calculate page dequeue size.
@option opts [Numeric] :dequeue_timeout (30) Page dequeue API request
timeout in seconds.
@option opts [Hash] :client_options ({}) Datahen
client gem additional
options (see Datahen::Client::Base#initialize method).
# File lib/datahen/scraper/batch_parser.rb, line 105 def initialize(job_id, config_file, opts = {}) opts = { worker_count: 1, max_garbage: 5, dequeue_interval: 3, dequeue_scale: 2, dequeue_timeout: 30, client_options: {} }.merge opts @job_id = job_id @worker_count = opts[:worker_count] @dequeue_interval = opts[:dequeue_interval] @dequeue_scale = opts[:dequeue_scale] @max_garbage = opts[:max_garbage] @pages = Concurrent::Array.new @loaded_pages = Concurrent::Hash.new @garbage_mutex = Mutex.new @dequeue_mutex = Mutex.new @not_found = false self.dequeue_timeout = opts[:dequeue_timeout] self.second_dequeue_count = 0 self.garbage_count = 0 self.config_file = config_file self.load_config @client = Datahen::Client::JobPage.new(opts[:client_options]) nil end
Get a unix timestamp. @return [Integer] unix timestamp
# File lib/datahen/scraper/batch_parser.rb, line 86 def self.timestamp Time.new.utc.to_i end
Wait a specific amount of seconds. @param [Integer] time_in_seconds Seconds to wait.
# File lib/datahen/scraper/batch_parser.rb, line 80 def self.wait time_in_seconds Kernel.sleep time_in_seconds end
Public Instance Methods
Dequeue one page from the previously loaded pages, and waits until there
are new pages whenever there are no loaded pages.
@return [Hash] dequeued page
# File lib/datahen/scraper/batch_parser.rb, line 291 def dequeue_pages # collect garbage self.recollect_garbage # return page if there are loeaded pages is_waiting = false while true do page = self.pages.shift unless page.nil? puts "[Worker #{Parallel.worker_number}]: Finish waiting" if is_waiting loaded_pages.delete(page['gid']) return page end # be more verbose on worker waiting unless is_waiting is_waiting = true puts "[Worker #{Parallel.worker_number}]: Is waiting for a page..." if self.second_dequeue_count > 1 && !self.not_found puts "\nWARNING: Your job is not optimized, increase your job's \"parser_dequeue_scale\"\n" end end self.class.wait 1 # ensure the dequeuer thread is alive and healthy self.ensure_dequeuer_thread end end
Refresh dequeuer's still alive timestamp
# File lib/datahen/scraper/batch_parser.rb, line 183 def dequeuer_is_alive! self.dequeue_mutex.synchronize do @dequeuer_still_alive = self.class.timestamp end nil end
Ensures that the dequeuer thread exists and is running. @return [Boolean] `true` if thread was alive, or `false` if had to
create a new thread
# File lib/datahen/scraper/batch_parser.rb, line 256 def ensure_dequeuer_thread self.dequeue_mutex.synchronize do # check if dequeuer thread is alive and healthy if !self.dequeuer_thread.nil? && self.dequeuer_thread.alive? still_alive_timeout = (self.dequeue_timeout + self.dequeue_interval) * 2 + self.dequeuer_still_alive return true if self.class.timestamp < still_alive_timeout # kill dequeuer thread self.repeat_puts "Dequeuer isn't healthy, will restart it..." self.dequeuer_thread.kill @dequeuer_thread = nil self.recollect_garbage self.no_repeat_puts "Dequeuer thread was killed!" end # dequeuing on parallel (the ride never ends :D) @dequeuer_thread = Thread.new do while true begin self.load_pages self.class.wait self.dequeue_interval rescue => e puts [e.message] + e.backtrace rescue 'error' end end puts "Error: dequeuer died! D:" end self.repeat_puts "Dequeuer thread was started!" end false end
Dequeue pages and execute the parsers associated to them on parallel.
# File lib/datahen/scraper/batch_parser.rb, line 321 def exec_parse save = false, keep_outputs = false if self.worker_count < 1 self.no_repeat_puts NO_WORKERS_MSG return else self.no_repeat_puts "Spawing #{self.worker_count} workers" end # start dequeuer self.ensure_dequeuer_thread # process the pages dequeue = lambda{ self.dequeue_pages } Parallel.each(dequeue, in_threads: (worker_count)) do |page| parser_file = self.parsers[page['page_type']] begin self.repeat_puts("Parsing page with GID #{page['gid']}") puts Datahen::Scraper::Parser.exec_parser_by_page( parser_file, page, job_id, save, nil, keep_outputs ) self.repeat_puts("Finish parsing page with GID #{page['gid']}") rescue Parallel::Kill => e puts "[Worker #{Parallel.worker_number}]: Someone tried to kill Parallel!!!" rescue Parallel::Break => e puts "[Worker #{Parallel.worker_number}]: Someone tried to break Parallel!!!" rescue => e puts [e.message] + e.backtrace rescue 'error' end end nil end
Loads the config file into a Hash.
# File lib/datahen/scraper/batch_parser.rb, line 150 def load_config # build page type to script file map @page_types = [] @parsers = Concurrent::Hash.new @config = YAML.load_file(config_file) self.config['parsers'].each do |v| next if !v['disabled'].nil? && !!v['disabled'] @page_types << v['page_type'] self.parsers[v['page_type']] = v['file'] end self.recollect_garbage nil end
Load new pages by dequeuing from the API. @return [Integer] amount of pages loaded
# File lib/datahen/scraper/batch_parser.rb, line 192 def load_pages self.dequeuer_is_alive! # calculate dequeue size max_dequeue_size = (self.worker_count * self.dequeue_scale).ceil current_size = self.pages.length dequeue_size = (self.dequeue_scale * (max_dequeue_size - current_size)).ceil if dequeue_size < 1 return 0 end dequeue_size = max_dequeue_size if dequeue_size > max_dequeue_size # reserve and get to pages parse response = nil begin response = client.dequeue self.job_id, dequeue_size, self.page_types, config['parse_fetching_failed'], timeout: self.dequeue_timeout rescue Net::ReadTimeout, Net::OpenTimeout => e self.repeat_puts "Dequeue API call timeout! Contact infra team, your job needs a profile change" self.dequeuer_is_alive! return 0 rescue => e raise e end self.dequeuer_is_alive! # ensure a valid response or try again if response.nil? || response.response.code.to_i != 200 self.repeat_puts(response.nil? ? 'null' : response.body) self.recollect_garbage return 0 end # add pages count = 0 (JSON.parse(response.body) || []).each do |page| count += 1 next if self.loaded_pages.has_key? page['gid'] self.pages << (self.loaded_pages[page['gid']] = page) end response = nil self.dequeuer_is_alive! # recolect garbage to free some memory before parsing if count > 0 @not_found = false self.recollect_garbage self.repeat_puts "Found #{count} page(s) to parse" self.second_dequeue_count += 1 unless self.second_dequeue_count > 1 else @not_found = true self.no_repeat_puts NOT_FOUND_MSG end # return how many pages were loaded count end
Print the message only when it is different from the last recorded
message.
@param [String] message Message to display.
# File lib/datahen/scraper/batch_parser.rb, line 175 def no_repeat_puts message return if message == self.last_message puts message self.last_message = message nil end
Execute garbage collector after it is requested as many times as
described by #max_garbage.
# File lib/datahen/scraper/batch_parser.rb, line 137 def recollect_garbage self.garbage_mutex.synchronize do self.garbage_count += 1 if self.garbage_count > self.max_garbage puts "Recollect garbage" GC.start self.garbage_count = 0 end end nil end
Print the message regardless of it being the same as the last message. @param [String] message Message to display.
# File lib/datahen/scraper/batch_parser.rb, line 166 def repeat_puts message puts message self.last_message = message nil end