class AkaneBigquery::CLI

Public Instance Methods

init() click to toggle source
# File lib/akane-bigquery/cli.rb, line 16
def init
  # check dataset existence
  dataset =  client.execute(
    api_method: api.datasets.get,
    parameters: {
      'projectId' => config['project_id'],
      'datasetId' => config['dataset_id'],
    }
  )

  if dataset.error?
    if dataset.error_message =~ /^Not Found:/i
      puts "Creating dataset #{config['dataset_id']} ..."
      dataset = client.execute(
        api_method: api.datasets.insert,
        parameters: {
          'projectId' => config['project_id'],
        },
        body_object: {
          'datasetReference' => {
            'datasetId' => config['dataset_id'],
          },
          'description' => 'akane',
        }
      )

      raise dataset.error_message if dataset.error?
    else
      raise dataset.error_message
    end
  end

  schemas = AkaneBigquery::Schema::SCHEMA

  schemas.each do |table_id, schema|
    table = client.execute(
      api_method: api.tables.get,
      parameters: {
        'projectId' => config['project_id'],
        'datasetId' => config['dataset_id'],
        'tableId' => table_id,
      },
    )

    if table.error?
      if table.error_message =~ /^Not Found:/i
        puts "Creating table #{table_id} ..."
        table = client.execute(
          api_method: api.tables.insert,
          parameters: {
            'projectId' => config['project_id'],
            'datasetId' => config['dataset_id'],
          },
          body_object: {
            'tableReference' => {
              'projectId' => config['project_id'],
              'datasetId' => config['dataset_id'],
              'tableId' => table_id,
            },
            'friendlyName' => table_id,
            'schema' => schema,
          }
        )
        raise table.error_message if table.error?
      else
        raise table.error_message
      end
    end

  end
end
prepare(source, prefix) click to toggle source
# File lib/akane-bigquery/cli.rb, line 91
def prepare(source, prefix)
  limit = 524288000 # 500MBytes

  count = -1
  bytes = 0

  new_io = lambda do
    bytes = 0
    count += 1
    path = File.join(prefix, "tweets.#{count.to_s.rjust(4,'0')}.txt")
    puts "=> Using #{path}"
    File.open(path, 'w')
  end
  io = new_io.call

  months = options[:months] && options[:months].split(/,/)
  before = options[:before] && Time.parse(options[:before])

  userdirs = Dir.entries(File.join(source, "users"))
  userdirs.each_with_index do |user_dirname, index|
    next if user_dirname == "." || user_dirname == ".."
    puts " * #{user_dirname} (#{index.succ}/#{userdirs.size}, #{((index.succ/userdirs.size.to_f)*100).to_i}%)"

    userdir = File.join(source, "users", user_dirname)

    tweet_filepaths = if options[:months]
                        months.map { |_| File.join(userdir, "tweets.#{_}.txt") }
                      else
                        Dir[File.join(userdir, 'tweets.*.txt')]
                      end
    tweet_filepaths.each do |file|
      begin
        File.open(file, 'r') do |tweets_io|
          tweets_io.each_line do |line|
            json = line.chomp

            tweet  = Oj.load(json)

            created_at = Time.parse(tweet['created_at'.freeze])
            next if before && before <= created_at

            new_json = {
              'json'.freeze => json,
              'id_str'.freeze => tweet['id_str'.freeze],
              'id'.freeze => tweet['id'.freeze],
              'text'.freeze => tweet['text'.freeze],
              'lang'.freeze => tweet['lang'.freeze],
              'source'.freeze => tweet['source'.freeze],
              'in_reply_to_status_id'.freeze => tweet['in_reply_to_status_id'.freeze],
              'in_reply_to_status_id_str'.freeze => tweet['in_reply_to_status_id_str'.freeze],
              'in_reply_to_user_id'.freeze => tweet['in_reply_to_user_id'.freeze],
              'in_reply_to_user_id_str'.freeze => tweet['in_reply_to_user_id_str'.freeze],
              'in_reply_to_screen_name'.freeze => tweet['in_reply_to_screen_name'.freeze],
              'user'.freeze => {
                'id_str'.freeze => tweet['user'.freeze]['id_str'.freeze],
                'id'.freeze => tweet['user'.freeze]['id'.freeze],
                'name'.freeze => tweet['user'.freeze]['name'.freeze],
                'screen_name'.freeze => tweet['user'.freeze]['screen_name'.freeze],
                'protected'.freeze => tweet['user'.freeze]['protected'.freeze],
              },
              'created_at'.freeze => created_at.to_i
            }

            if tweet['coordinates'.freeze]
              new_json['coordinates_longitude'.freeze] = tweet['coordinates'.freeze]['coordinates'.freeze][0]
              new_json['coordinates_latitude'.freeze] = tweet['coordinates'.freeze]['coordinates'.freeze][1]
            end

            if tweet['place'.freeze]
              place = tweet['place'.freeze]
              new_json['place'.freeze] = {
                'id'.freeze => place['id'.freeze],
                'country'.freeze => place['country'.freeze],
                'country_code'.freeze => place['country_code'.freeze],
                'name'.freeze => place['name'.freeze],
                'full_name'.freeze => place['full_name'.freeze],
                'place_type'.freeze => place['place_type'.freeze],
                'url'.freeze => place['url'.freeze],
              }
            end

            new_json_str = Oj.dump(new_json)
            io.puts new_json_str
            bytes += new_json_str.size + 1
            io = new_io.call if limit <= bytes
          end
        end
      rescue Errno::ENOENT
      end

    end

  end
end

Private Instance Methods

api() click to toggle source
# File lib/akane-bigquery/cli.rb, line 207
def api
  client_and_api; @api
end
client() click to toggle source
# File lib/akane-bigquery/cli.rb, line 203
def client
  client_and_api; @client
end
client_and_api() click to toggle source
# File lib/akane-bigquery/cli.rb, line 211
def client_and_api
  return @client_and_api if @client_and_api

  @client_and_api = AkaneBigquery.make_bigquery_client(config) 
  @client, @api = @client_and_api
end
config() click to toggle source
# File lib/akane-bigquery/cli.rb, line 188
def config
  @config ||= begin
    storages = YAML.load_file(options[:config])['storages']

    conf = if options[:config_name]
      storages.find { |_| _['bigquery'] && _['bigquery']['name'] == options[:config_name] }
    else
      storages.find { |_| _['bigquery'] }
    end

    (conf && conf['bigquery']) or \
      abort 'error: bigquery storage configuration not found'
  end
end