class ElasticsearchMysqlImporter::Importer

Attributes

output_file[RW]

Public Instance Methods

configure() { |configuration| ... } click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 12
def configure
  @configuration ||= Configuration.new
  yield(@configuration) if block_given?
  validate_configuration
end
write_elasticsearch() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 25
def write_elasticsearch
  call_elasticsearch_bulk_api
end
write_file() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 18
def write_file
  if @configuration.output_file.nil?
    raise "Missing Configuration: 'output_file' is required."
  end
  create_import_file
end

Private Instance Methods

call_elasticsearch_bulk_api() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 103
def call_elasticsearch_bulk_api
  begin
    elasticsearch_bulk_uri = "http://#{@configuration.elasticsearch_host}:#{@configuration.elasticsearch_port}/_bulk"
    uri = URI.parse(elasticsearch_bulk_uri)
    data = File.open(@output_file, 'r').read
    raise "Error: generated import file is empty." if data.empty?
    http = Net::HTTP.new(uri.host, uri.port)
    response, body = http.post(uri.path, data, {'Content-type'=>'application/json'})
  rescue Timeout::Error, StandardError => e
    puts "Failed to call Bulk API: #{e.message}"
  end
end
connect_db() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 39
def connect_db
  if not @configuration.mysql_socket.nil?
    # not tested yet
    Mysql2::Client.new({
      :host => @configuration.mysql_host,
      :socket => @configuration.mysql_socket,
      :username => @configuration.mysql_username,
      :password => @configuration.mysql_password,
      :database => @configuration.mysql_database,
      :encoding => @configuration.mysql_encoding,
      :reconnect => true
    })
  else
    Mysql2::Client.new({
      :host => @configuration.mysql_host,
      :port => @configuration.mysql_port,
      :username => @configuration.mysql_username,
      :password => @configuration.mysql_password,
      :database => @configuration.mysql_database,
      :encoding => @configuration.mysql_encoding,
      :reconnect => true
    })
  end
end
create_import_file() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 74
def create_import_file
  begin
    file = get_file_io_object
    db = connect_db
    db.query(@configuration.prepared_query, @configuration.mysql_options)
    db.query(@configuration.query, @configuration.mysql_options).each do |row|
      row.select {|k, v| v.to_s.strip.match(/^SELECT/i) }.each do |k, v|
        row[k] = [] unless row[k].is_a?(Array)
        db.query(v.gsub(/\$\{([^\}]+)\}/) {|matched| row[$1].to_s}).each do |nest_row|
          row[k] << nest_row
        end
      end
      header = { 
        "index" => {
          "_index" => @configuration.elasticsearch_index,
          "_type" => @configuration.elasticsearch_type,
          "_id" => row[@configuration.primary_key]
        }
      }
      file.puts(Yajl::Encoder.encode(header))
      file.puts(Yajl::Encoder.encode(row))
    end
    file.seek 0
    return file.path
  rescue StandardError => e
    puts "Failed to generate import file: #{e.message}"
  end
end
get_file_io_object() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 64
def get_file_io_object
  if @configuration.output_file.nil?
    file = Tempfile.open(['elasticsearch_mysql_importer_','.json'])
  else
    file = File.open(@configuration.output_file, 'w+')
  end
  @output_file = file.path
  return file
end
validate_configuration() click to toggle source
# File lib/elasticsearch_mysql_importer/importer.rb, line 30
def validate_configuration
  if @configuration.mysql_database.nil? or @configuration.query.nil?
    raise "Missing Configuration: 'mysql_database' and 'query' are required."
  end
  if @configuration.elasticsearch_index.nil? or @configuration.elasticsearch_type.nil?
    raise "Missing Configuration: 'elasticsearch_index' and 'elasticsearch_type' are required."
  end
end