class Datahen::Scraper::BatchParser

Constants

NOT_FOUND_MSG
NO_DEQUEUE_COUNT_MSG
NO_WORKERS_MSG

Attributes

client[R]

Datahen job pages client used for API pages dequeuing. @return [Datahen::Client::JobPage] datahen job pages API client

config[R]

Current config file loaded. @return [Hash] current loaded configuration

config_file[RW]

Configuration file path. @return [String] config file path

dequeue_interval[R]

Dequeue interval in seconds. @return [Integer] dequeue interval in seconds

dequeue_mutex[R]

Dequeuer mutext used to synchronize page dequeuing. @return [Mutex] dequeuer mutex

dequeue_scale[R]

Dequeue scale used to calculate the ideal dequeue size. @return [Numeric] dequeue scale

dequeue_timeout[RW]

Dequeue API request timeout in seconds. @return [Integer] dequeue API request timeout in seconds

dequeuer_still_alive[R]

Dequeuer last run unix timestamp. @return [Integer] dequeuer last run unix timestamp

dequeuer_thread[R]

Current dequeuer thread. @return [Thread] dequeuer thread

garbage_count[RW]

Garbage collector request counter. @return [Integer] garbage collector counter

garbage_mutex[R]

Garbage collector mutex used to synchronize garbage collector requests. @return [Mutex] garbage collector mutex

job_id[R]

Job id to be executed. @return [Integer] job id

last_message[RW]

Last printed message, useful to prevent duplicated log messages. @return [String] last printed message

loaded_pages[R]

Loaded pages hash, useful to avoid duplicates on the loaded pages array. @return [Concurrent::Hash<String, Hash>] loaded pages as a concurrent hash

max_garbage[R]

Max garbage collector requests before actually executing the garbage

collector.

@return [Integer] max garbage request quantity before actually executing

it
not_found[R]

Indicates whenever the wait time is because there are no more pages. @return [Boolean] `true` when wait time is due to no more pages,

else `false`
page_types[R]

Known page types extracted from the config file. @return [Array<String>] known page types

pages[R]

Loaded pages array. @return [Concurrent::Array<Hash>] loaded pages as an array

parsers[R]

Known parsers extracted from the config file. @return [Concurrent::Hash<String, String>] known parsers

second_dequeue_count[RW]

Second dequeue counter used to prevent false negative warning messages. @return [Integer] second dequeue counter

worker_count[R]

Parallel worker quantity. @return [Integer] parallel worker quantity

Public Class Methods

new(job_id, config_file, opts = {}) click to toggle source

Initialize a batch parser object. @param [Integer] job_id Job id. @param [String] config_file Config file path. @param [Hash] opts ({}) Configuration options @option opts [Integer] :worker_count (1) Parallel worker quantity. @option opts [Integer] :max_garbage (5) Max amount of times the garbage

collector can be requested before actually executing.

@option opts [Integer] :dequeue_interval (3) Time in seconds to wait

between page dequeuing.

@option opts [Numeric] :dequeue_scale (2) Scaling factor to used to

calculate page dequeue size.

@option opts [Numeric] :dequeue_timeout (30) Page dequeue API request

timeout in seconds.

@option opts [Hash] :client_options ({}) Datahen client gem additional

options (see Datahen::Client::Base#initialize method).
# File lib/datahen/scraper/batch_parser.rb, line 105
def initialize(job_id, config_file, opts = {})
  opts = {
    worker_count: 1,
    max_garbage: 5,
    dequeue_interval: 3,
    dequeue_scale: 2,
    dequeue_timeout: 30,
    client_options: {}
  }.merge opts

  @job_id = job_id
  @worker_count = opts[:worker_count]
  @dequeue_interval = opts[:dequeue_interval]
  @dequeue_scale = opts[:dequeue_scale]
  @max_garbage = opts[:max_garbage]
  @pages = Concurrent::Array.new
  @loaded_pages = Concurrent::Hash.new
  @garbage_mutex = Mutex.new
  @dequeue_mutex = Mutex.new
  @not_found = false
  self.dequeue_timeout = opts[:dequeue_timeout]
  self.second_dequeue_count = 0
  self.garbage_count = 0
  self.config_file = config_file
  self.load_config

  @client = Datahen::Client::JobPage.new(opts[:client_options])
  nil
end
timestamp() click to toggle source

Get a unix timestamp. @return [Integer] unix timestamp

# File lib/datahen/scraper/batch_parser.rb, line 86
def self.timestamp
  Time.new.utc.to_i
end
wait(time_in_seconds) click to toggle source

Wait a specific amount of seconds. @param [Integer] time_in_seconds Seconds to wait.

# File lib/datahen/scraper/batch_parser.rb, line 80
def self.wait time_in_seconds
  Kernel.sleep time_in_seconds
end

Public Instance Methods

dequeue_pages() click to toggle source

Dequeue one page from the previously loaded pages, and waits until there

are new pages whenever there are no loaded pages.

@return [Hash] dequeued page

# File lib/datahen/scraper/batch_parser.rb, line 291
def dequeue_pages
  # collect garbage
  self.recollect_garbage

  # return page if there are loeaded pages
  is_waiting = false
  while true do
    page = self.pages.shift
    unless page.nil?
      puts "[Worker #{Parallel.worker_number}]: Finish waiting" if is_waiting
      loaded_pages.delete(page['gid'])
      return page
    end

    # be more verbose on worker waiting
    unless is_waiting
      is_waiting = true
      puts "[Worker #{Parallel.worker_number}]: Is waiting for a page..."
      if self.second_dequeue_count > 1 && !self.not_found
        puts "\nWARNING: Your job is not optimized, increase your job's \"parser_dequeue_scale\"\n"
      end
    end
    self.class.wait 1

    # ensure the dequeuer thread is alive and healthy
    self.ensure_dequeuer_thread
  end
end
dequeuer_is_alive!() click to toggle source

Refresh dequeuer's still alive timestamp

# File lib/datahen/scraper/batch_parser.rb, line 183
def dequeuer_is_alive!
  self.dequeue_mutex.synchronize do
    @dequeuer_still_alive = self.class.timestamp
  end
  nil
end
ensure_dequeuer_thread() click to toggle source

Ensures that the dequeuer thread exists and is running. @return [Boolean] `true` if thread was alive, or `false` if had to

create a new thread
# File lib/datahen/scraper/batch_parser.rb, line 256
def ensure_dequeuer_thread
  self.dequeue_mutex.synchronize do
    # check if dequeuer thread is alive and healthy
    if !self.dequeuer_thread.nil? && self.dequeuer_thread.alive?
      still_alive_timeout = (self.dequeue_timeout + self.dequeue_interval) * 2 + self.dequeuer_still_alive
      return true if self.class.timestamp < still_alive_timeout

      # kill dequeuer thread
      self.repeat_puts "Dequeuer isn't healthy, will restart it..."
      self.dequeuer_thread.kill
      @dequeuer_thread = nil
      self.recollect_garbage
      self.no_repeat_puts "Dequeuer thread was killed!"
    end

    # dequeuing on parallel (the ride never ends :D)
    @dequeuer_thread = Thread.new do
      while true
        begin
          self.load_pages
          self.class.wait self.dequeue_interval
        rescue => e
          puts [e.message] + e.backtrace rescue 'error'
        end
      end
      puts "Error: dequeuer died! D:"
    end
    self.repeat_puts "Dequeuer thread was started!"
  end
  false
end
exec_parse(save = false, keep_outputs = false) click to toggle source

Dequeue pages and execute the parsers associated to them on parallel.

# File lib/datahen/scraper/batch_parser.rb, line 321
def exec_parse save = false, keep_outputs = false
  if self.worker_count < 1
    self.no_repeat_puts NO_WORKERS_MSG
    return
  else
    self.no_repeat_puts "Spawing #{self.worker_count} workers"
  end

  # start dequeuer
  self.ensure_dequeuer_thread

  # process the pages
  dequeue = lambda{ self.dequeue_pages }
  Parallel.each(dequeue, in_threads: (worker_count)) do |page|
    parser_file = self.parsers[page['page_type']]
    begin
      self.repeat_puts("Parsing page with GID #{page['gid']}")
      puts Datahen::Scraper::Parser.exec_parser_by_page(
        parser_file,
        page,
        job_id,
        save,
        nil,
        keep_outputs
      )
      self.repeat_puts("Finish parsing page with GID #{page['gid']}")
    rescue Parallel::Kill => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to kill Parallel!!!"
    rescue Parallel::Break => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to break Parallel!!!"
    rescue => e
      puts [e.message] + e.backtrace rescue 'error'
    end
  end

  nil
end
load_config() click to toggle source

Loads the config file into a Hash.

# File lib/datahen/scraper/batch_parser.rb, line 150
def load_config
  # build page type to script file map
  @page_types = []
  @parsers = Concurrent::Hash.new
  @config = YAML.load_file(config_file)
  self.config['parsers'].each do |v|
    next if !v['disabled'].nil? && !!v['disabled']
    @page_types << v['page_type']
    self.parsers[v['page_type']] = v['file']
  end
  self.recollect_garbage
  nil
end
load_pages() click to toggle source

Load new pages by dequeuing from the API. @return [Integer] amount of pages loaded

# File lib/datahen/scraper/batch_parser.rb, line 192
def load_pages
  self.dequeuer_is_alive!

  # calculate dequeue size
  max_dequeue_size = (self.worker_count * self.dequeue_scale).ceil
  current_size = self.pages.length
  dequeue_size = (self.dequeue_scale * (max_dequeue_size - current_size)).ceil
  if dequeue_size < 1
    return 0
  end
  dequeue_size = max_dequeue_size if dequeue_size > max_dequeue_size

  # reserve and get to pages parse
  response = nil
  begin
    response = client.dequeue self.job_id,
      dequeue_size,
      self.page_types,
      config['parse_fetching_failed'],
      timeout: self.dequeue_timeout
  rescue Net::ReadTimeout, Net::OpenTimeout => e
    self.repeat_puts "Dequeue API call timeout! Contact infra team, your job needs a profile change"
    self.dequeuer_is_alive!
    return 0
  rescue => e
    raise e
  end
  self.dequeuer_is_alive!

  # ensure a valid response or try again
  if response.nil? || response.response.code.to_i != 200
    self.repeat_puts(response.nil? ? 'null' : response.body)
    self.recollect_garbage
    return 0
  end

  # add pages
  count = 0
  (JSON.parse(response.body) || []).each do |page|
    count += 1
    next if self.loaded_pages.has_key? page['gid']
    self.pages << (self.loaded_pages[page['gid']] = page)
  end
  response = nil
  self.dequeuer_is_alive!

  # recolect garbage to free some memory before parsing
  if count > 0
    @not_found = false
    self.recollect_garbage
    self.repeat_puts "Found #{count} page(s) to parse"
    self.second_dequeue_count += 1 unless self.second_dequeue_count > 1
  else
    @not_found = true
    self.no_repeat_puts NOT_FOUND_MSG
  end

  # return how many pages were loaded
  count
end
no_repeat_puts(message) click to toggle source

Print the message only when it is different from the last recorded

message.

@param [String] message Message to display.

# File lib/datahen/scraper/batch_parser.rb, line 175
def no_repeat_puts message
  return if message == self.last_message
  puts message
  self.last_message = message
  nil
end
recollect_garbage() click to toggle source

Execute garbage collector after it is requested as many times as

described by #max_garbage.
# File lib/datahen/scraper/batch_parser.rb, line 137
def recollect_garbage
  self.garbage_mutex.synchronize do
    self.garbage_count += 1
    if self.garbage_count > self.max_garbage
      puts "Recollect garbage"
      GC.start
      self.garbage_count = 0
    end
  end
  nil
end
repeat_puts(message) click to toggle source

Print the message regardless of it being the same as the last message. @param [String] message Message to display.

# File lib/datahen/scraper/batch_parser.rb, line 166
def repeat_puts message
  puts message
  self.last_message = message
  nil
end