class Akane::Storages::Bigquery

Public Class Methods

new(*) click to toggle source
Calls superclass method
# File lib/akane/storages/bigquery.rb, line 11
def initialize(*)
  super

  @client, @api = AkaneBigquery.make_bigquery_client(@config)

  @project_id = @config['project_id']
  @dataset_id = @config['dataset_id']

  @lock = Mutex.new
  @thread = nil

  @flush_interval = @config['flush_interval'] ? @config['flush_interval'].to_i : 60
  @flush_threshold = @config['flush_threshold'] ? @config['flush_threshold'].to_i : 1000

  @pending_inserts = []
  @failing_inserts = []
  @pending_inserts_lock = Mutex.new

  swap_buffers # initialize
  start
end

Public Instance Methods

bq_insert(table, row) click to toggle source
# File lib/akane/storages/bigquery.rb, line 37
def bq_insert(table, row)
  @lock.synchronize do
    @buffers[table] << row
  end
  self
end
exitable?() click to toggle source
# File lib/akane/storages/bigquery.rb, line 53
def exitable?
  @stop && (@thread ? @thread.alive? : true)
end
mark_as_deleted(account, user_id, tweet_id) click to toggle source
# File lib/akane/storages/bigquery.rb, line 109
def mark_as_deleted(account, user_id, tweet_id)
  bq_insert(:deletions,
    'user_id'.freeze => user_id,
    'user_id_str'.freeze => user_id.to_s,
    'tweet_id'.freeze => tweet_id,
    'tweet_id_str'.freeze => tweet_id.to_s,
    'deleted_at'.freeze => Time.now.to_i,
  )
end
name() click to toggle source
# File lib/akane/storages/bigquery.rb, line 33
def name
  @name ||= "bigquery:#{@project_id}/#{@dataset_id}"
end
record_event(account, event) click to toggle source
# File lib/akane/storages/bigquery.rb, line 119
def record_event(account, event)
  source = event['source'.freeze]
  target = event['target'.freeze]
  target_object = event['target_object'.freeze]

  source_id = source[:id]
  target_id = target[:id]

  unless source_id && target_id
    @logger.warn "Discarding event because source and target id is missing: #{event.inspect}"
    return
  end

  hash = Hash[
    event.map { |k,v| [k, v && v.respond_to?(:attrs) ? v.attrs : nil] }
  ]

  row = {
    'json'.freeze => hash.to_json,
    'event'.freeze => event['event'.freeze],
    'source_id'.freeze => source_id,
    'source_id_str'.freeze => source_id.to_s,
    'target_id'.freeze => target_id,
    'target_id_str'.freeze => target_id.to_s,
    'created_at'.freeze => Time.now.to_i
  }

  if target_object && target_object[:id]
    id = target_object[:id]
    row['target_object_id'.freeze] = id
    row['target_object_id_str'.freeze] = id.to_s
  end

  p row
  bq_insert :events, row
end
record_message(account, message) click to toggle source
# File lib/akane/storages/bigquery.rb, line 156
def record_message(account, message)
end
record_tweet(account, tweet) click to toggle source
# File lib/akane/storages/bigquery.rb, line 64
def record_tweet(account, tweet)
  hash = tweet.attrs
  row = {
    'json'.freeze => hash.to_json,
    'id_str'.freeze => hash[:id_str],
    'id'.freeze => hash[:id],
    'text'.freeze => hash[:text],
    'lang'.freeze => hash[:lang],
    'source'.freeze => hash[:source],
    'in_reply_to_status_id'.freeze => hash[:in_reply_to_status_id],
    'in_reply_to_status_id_str'.freeze => hash[:in_reply_to_status_id_str],
    'in_reply_to_user_id'.freeze => hash[:in_reply_to_user_id],
    'in_reply_to_user_id_str'.freeze => hash[:in_reply_to_user_id_str],
    'in_reply_to_screen_name'.freeze => hash[:in_reply_to_screen_name],
    'user'.freeze => {
      'id_str'.freeze => hash[:user][:id_str],
      'id'.freeze => hash[:user][:id],
      'name'.freeze => hash[:user][:name],
      'screen_name'.freeze => hash[:user][:screen_name],
      'protected'.freeze => hash[:user][:protected],
    },
    'created_at'.freeze => Time.parse(hash[:created_at]).to_i
  }

  if hash['coordinates'.freeze]
    row['coordinates_longitude'.freeze], row['coordinates_latitude'.freeze] = \
      hash[:coordinates][:coordinates]
  end

  if hash[:place]
    place = hash[:place]
    row['place'.freeze] = {
      'id'.freeze => place[:id],
      'country'.freeze => place[:country],
      'country_code'.freeze => place[:country_code],
      'name'.freeze => place[:name],
      'full_name'.freeze => place[:full_name],
      'place_type'.freeze => place[:place_type],
      'url'.freeze => place[:url],
    }
  end

  bq_insert :tweets, row
end
start() click to toggle source
# File lib/akane/storages/bigquery.rb, line 44
def start
  @lock.synchronize do
    unless @thread
      @thread = Thread.new(&method(:worker_loop))
      @stop = false
    end
  end
end
status() click to toggle source
# File lib/akane/storages/bigquery.rb, line 159
def status
  @buffers ? @buffers.map{ |table, buf| "#{table}=#{buf.size}" }.join(', ') + " | #{@failing_inserts.size} failures, #{@pending_inserts.size} inserts" : "-"
end
stop!() click to toggle source
Calls superclass method
# File lib/akane/storages/bigquery.rb, line 57
def stop!
  @lock.synchronize do
    super
    @thread.raise(Stop) if @thread
  end
end

Private Instance Methods

flush_buffer() click to toggle source
# File lib/akane/storages/bigquery.rb, line 209
def flush_buffer
  prev_buffers = swap_buffers()

  prev_buffers.each do |table, rows|
    next if rows.empty?

    insert_id_base = "#{Time.now.to_f}:#{rows.__id__}:#{table}"
    request = {
      api_method: @api.tabledata.insert_all,
      parameters: {
        'datasetId' => @dataset_id,
        'projectId' => @project_id,
        'tableId' => table.to_s,
      },
      body_object: {
        'rows' => rows.map.with_index { |row, index|
          {
            'insertId'.freeze => "#{insert_id_base}:#{index}",
            'json'.freeze => row,
          }
        }
      }
    }
    @pending_inserts_lock.synchronize do
      @logger.debug "Adding pending inserts for #{table}, #{rows.size} rows"
      @pending_inserts << {request: request, insert_id: insert_id_base}
    end
  end

  @last_flush = Time.now
end
flush_pending_inserts(do_failures = false) click to toggle source
# File lib/akane/storages/bigquery.rb, line 241
def flush_pending_inserts(do_failures = false)
  while failing_request = @failing_inserts.shift
    if do_failures || Time.now <= failing_request[:next_try]
      @logger.info "[#{name}] Retrying #{failing_request[:insert_id]}"
      @pending_inserts_lock.synchronize { @pending_inserts.push(failing_request) }
    end
  end

  while request = @pending_inserts_lock.synchronize { @pending_inserts.shift }
    table = request[:request][:parameters]['tableId']
    result = @client.execute(request[:request])

    if result.error?
      if request[:retry]
        request[:retry] *= 1.8
      else
        request[:retry] = 5
      end

      request[:next_try] = Time.now + request[:retry]

      @logger.error "[#{name}] Failed #{table} to insert: #{result.error_message} (#{request[:insert_id]}); retrying in #{request[:retry]} seconds"
      @failing_inserts << request
    else
      @logger.debug "[#{name}] Inserted records in #{table}"
    end
  end
end
swap_buffers() click to toggle source
# File lib/akane/storages/bigquery.rb, line 165
def swap_buffers
  @lock.synchronize do
    old_buffers = @buffers
    @buffers = {tweets: [], messages: [], deletions: [], events: []}

    old_buffers
  end
end
worker_loop() click to toggle source
# File lib/akane/storages/bigquery.rb, line 174
def worker_loop
  @last_flush = Time.now
  retry_interval = 1

  begin
    flush_pending_inserts

    loop do
      if @flush_interval <= (Time.now - @last_flush) || @flush_threshold <= @buffers.values.map(&:size).inject(:+)
        flush_buffer
      end

      flush_pending_inserts

      sleep 1
    end
  rescue Stop
    @logger.info "Flushing buffer for graceful quit"
    flush_buffer
    until @pending_inserts.empty? && @failing_inserts.empty?
      flush_pending_inserts(true)
      sleep 10 unless @failing_inserts.empty?
    end
  rescue Exception => e
    @logger.error "#{name} - Encountered error on buffer worker"
    @logger.error e.inspect
    @logger.error e.backtrace.join("\n")

    @logger.error "Retrying after #{retry_interval.to_i}"
    sleep retry_interval.to_i
    retry_interval *= 1.8
    retry
  end
end