class FileMiner

Constants

DEFAULT_SETTINGS
VERSION

Attributes

miner[R]
output[R]
running[R]

Public Class Methods

new(conf) click to toggle source

Create a new FileMiner instance

@param [Hash] conf

# File lib/fileminer.rb, line 25
def initialize(conf)
  init_settings conf['fileminer.settings']
  @output = init_output conf
  raise 'Missing config fileminer.inputs' unless conf.key? 'fileminer.inputs'
  @miner = Miner.new conf['fileminer.inputs'].keys_to_sym
  @miner.refresh_files
  @miner.save_registry
  @running = false
end

Public Instance Methods

check_files() click to toggle source
# File lib/fileminer.rb, line 221
def check_files
  if @miner.files_need_refresh? @refresh_files_time_trigger
    @miner.refresh_files
  end
end
mine_once() click to toggle source
# File lib/fileminer.rb, line 171
def mine_once
  start_time = Time.now
  full_lines = 0
  miner = @miner
  miner.active_files.all? do |record|
    mining_next = true
    if record[:pos] < File.size(record[:path])
      file_lines = 0
      loop do
        lines = miner.read_lines record
        break if lines.empty?
        send_lines record, lines
        file_lines += lines.size
        full_lines += lines.size
        if mining_break? start_time, full_lines
          mining_next = false
          break
        end
        break if file_break? file_lines
      end
    end
    mining_next
  end
  full_lines
end
start_mining() click to toggle source
# File lib/fileminer.rb, line 197
def start_mining
  unless @running
    @running = true
    sleep_seconds = @sleep_seconds_when_no_more_data
    while @running
      begin
        files_refreshed = check_files
        sent_lines = mine_once
        # sleep 5 seconds if no more data
        # TODO using settings instead in future
        if sent_lines == 0
          @miner.save_work_status if files_refreshed
          sleep sleep_seconds
        end
      rescue => e
        @logger.error e
        # sleep for a little while to wait output recover
        sleep sleep_seconds if @running
      end
    end
    @miner.save_registry
  end
end
stop_mining() click to toggle source
# File lib/fileminer.rb, line 227
def stop_mining
  @running = false if @running
end

Private Instance Methods

file_break?(lines) click to toggle source
# File lib/fileminer.rb, line 94
def file_break?(lines)
  @file_break_trigger.call lines
end
init_output(conf) click to toggle source
# File lib/fileminer.rb, line 98
def init_output(conf)
  case
  when conf.key?('output.redis')
    redis_conf = conf['output.redis'].keys_to_sym
    init_output_redis redis_conf
  when conf.key?('output.kafka')
    kafka_conf = conf['output.kafka'].keys_to_sym
    init_output_kafka kafka_conf
  when conf.key?('output.mysql')
    mysql_conf = conf['output.mysql'].keys_to_sym
    init_output_mysql mysql_conf
  when conf.key?('output.script')
    script_conf = conf['output.script'].keys_to_sym
    init_output_script script_conf
  else
    raise 'Missing config for output'
  end
end
init_output_kafka(kafka_conf) click to toggle source
# File lib/fileminer.rb, line 122
def init_output_kafka(kafka_conf)
  require_relative 'fileminer/output/kafka'
  kafka_conf[:mode] = kafka_conf[:mode] == 'async' ? :async : :sync
  if kafka_conf[:mode] == :async
    kafka_conf[:auto_delivery] = kafka_conf[:auto_delivery] == 'enabled' ? :enabled : :disabled
    if kafka_conf[:auto_delivery] == :enabled
      delivery_threshold = kafka_conf.delete :delivery_threshold
      delivery_interval = kafka_conf.delete :delivery_interval
      raise 'Missing conf delivery_threshold or delivery_interval' if delivery_threshold.nil? && delivery_interval.nil?
      kafka_conf[:delivery_conf] = delivery_conf = Hash.new
      delivery_conf[:delivery_threshold] = delivery_threshold unless delivery_threshold.nil?
      delivery_conf[:delivery_interval] = delivery_interval unless delivery_interval.nil?
    end
  end
  Output::KafkaPlugin.new kafka_conf
end
init_output_mysql(mysql_conf) click to toggle source
# File lib/fileminer.rb, line 139
def init_output_mysql(mysql_conf)
  require_relative 'fileminer/output/mysql'
  mysql_conf[:ssl_mode] = mysql_conf[:ssl_mode] == 'enabled' ? :enabled : :disabled
  Output::MysqlPlugin.new mysql_conf
end
init_output_redis(redis_conf) click to toggle source
# File lib/fileminer.rb, line 117
def init_output_redis(redis_conf)
  require_relative 'fileminer/output/redis'
  Output::RedisPlugin.new redis_conf
end
init_output_script(script_conf) click to toggle source
# File lib/fileminer.rb, line 145
def init_output_script(script_conf)
  script_path = script_conf[:script]
  plugin_class_name = script_conf[:plugin_class]
  init_options = script_conf[:init_options] || {}
  require script_path
  plugin_class = Object.const_get plugin_class_name
  plugin_class.new init_options.keys_to_sym
end
init_settings(conf) click to toggle source
# File lib/fileminer.rb, line 36
def init_settings(conf)
  if conf.nil?
    conf = DEFAULT_SETTINGS.clone
  else
    conf = DEFAULT_SETTINGS.merge conf.keys_to_sym
  end
  # default logger to stderr
  # TODO make logger configurable in future
  @logger = Logger.new STDERR
  @logger.level = Logger::WARN
  # mining break trigger
  max_time_of_each_mining = parse_time conf[:max_time_of_each_mining], 'max_time_of_each_mining on fileminer.settings'
  max_lines_of_each_mining = conf[:max_lines_of_each_mining]
  if max_lines_of_each_mining >= 0
    @mining_break_trigger = lambda { |start_time, lines| Time.now - start_time > max_time_of_each_mining || lines >= max_lines_of_each_mining }
  else
    @mining_break_trigger = lambda { |start_time, lines| Time.now - start_time > max_time_of_each_mining }
  end
  # file break trigger
  max_lines_of_each_file = conf[:max_lines_of_each_file]
  if max_lines_of_each_file >= 0
    @file_break_trigger = lambda { |lines| lines < @miner.batch_lines || lines >= max_lines_of_each_file }
  else
    @file_break_trigger = lambda { |lines| lines < @miner.batch_lines }
  end
  # refresh_files_time_trigger
  @refresh_files_time_trigger = parse_time conf[:refresh_files_time_trigger], 'refresh_files_time_trigger on fileminer.settings'
  # sleep seconds when no more data
  @sleep_seconds_when_no_more_data = parse_time conf[:sleep_time_when_no_more_data], 'sleep_time_when_no_more_data on fileminer.settings'
end
mining_break?(start_time, lines) click to toggle source
# File lib/fileminer.rb, line 90
def mining_break?(start_time, lines)
  @mining_break_trigger.call start_time, lines
end
parse_time(value, conf_name) click to toggle source
# File lib/fileminer.rb, line 67
def parse_time(value, conf_name)
  if /^(\d+)(\w+)$/ =~ value
    num = $1.to_i
    unit = $2
    case unit
    when 'd'
      num * 86400
    when 'h'
      num * 3600
    when 'min'
      num * 60
    when 's'
      num
    when 'ms'
      num.to_f / 1000
    else
      raise "Unsupported time unit '#{unit}' of #{conf_name}"
    end
  else
    raise "Error format '#{value}' of #{conf_name}"
  end
end
send_lines(record, lines) click to toggle source
# File lib/fileminer.rb, line 154
def send_lines(record, lines)
  if @output.batch?
    @output.send_all lines do
      record[:pos] = lines[-1][:end]
      @miner.save_registry
    end
  else
    lines.each do |line|
      @output.send line do
        record[:pos] = line[:end]
        @miner.save_registry
      end
    end
  end
end