class Embulk::InputTwitterStream

Constants

COLUMN_NAMES

Public Class Methods

new(task, schema, index, page_builder) click to toggle source
Calls superclass method
# File lib/embulk/input_twitterstream.rb, line 75
def initialize(task, schema, index, page_builder)
  super
end
transaction(config) { |task, columns, threads| ... } click to toggle source
# File lib/embulk/input_twitterstream.rb, line 58
def self.transaction(config, &control)
  task = config
  threads = 1
  cols = config.param('columns', :array)
  if cols.empty?
    columns = COLUMN_NAMES.map.with_index  {|column, index|
      Column.new(index, column, :string)
    }
  else
    columns = config.param('columns', :array).map.with_index { |column, index|
      Column.new(index, column, :string)
    }
  end
  commit_reports = yield(task, columns, threads)
  return {}
end

Public Instance Methods

dot_flatten(hash, path = '') click to toggle source
# File lib/embulk/input_twitterstream.rb, line 79
def dot_flatten(hash, path = '')
  hash.each_with_object({}) do |(k, v), ret|
    key = path + k.to_s
    if v.is_a? Hash
      ret.merge! dot_flatten(v, key + '.')
    else
      ret[key] = v
    end
  end
end
run() click to toggle source
# File lib/embulk/input_twitterstream.rb, line 90
def run
  client = Twitter::Streaming::Client.new(
    consumer_key:         @task['consumer_key'],
    consumer_secret:      @task['consumer_secret'],
    access_token:         @task['access_token'],
    access_token_secret:  @task['access_token_secret'],
  )
  count = @task['count'] ? @task['count'].to_i : 0
  begin
    client.user do |item|
      case item
      when Twitter::Tweet
        tweet = dot_flatten(item.to_hash)
        @page_builder.add(@schema.map {|column| tweet.has_key?(column.name) ? tweet[column.name].to_s : ''})
        if count > 0
          count -= 1
          if count == 0
            raise StopStreamException if count == 0
          end
        end
      end
    end
  rescue StopStreamException
  end
  @page_builder.finish
  commit_report = {}
  return commit_report
end