class Embulk::InputTwitterStream
Constants
- COLUMN_NAMES
Public Class Methods
new(task, schema, index, page_builder)
click to toggle source
Calls superclass method
# File lib/embulk/input_twitterstream.rb, line 75 def initialize(task, schema, index, page_builder) super end
transaction(config) { |task, columns, threads| ... }
click to toggle source
# File lib/embulk/input_twitterstream.rb, line 58 def self.transaction(config, &control) task = config threads = 1 cols = config.param('columns', :array) if cols.empty? columns = COLUMN_NAMES.map.with_index {|column, index| Column.new(index, column, :string) } else columns = config.param('columns', :array).map.with_index { |column, index| Column.new(index, column, :string) } end commit_reports = yield(task, columns, threads) return {} end
Public Instance Methods
dot_flatten(hash, path = '')
click to toggle source
# File lib/embulk/input_twitterstream.rb, line 79 def dot_flatten(hash, path = '') hash.each_with_object({}) do |(k, v), ret| key = path + k.to_s if v.is_a? Hash ret.merge! dot_flatten(v, key + '.') else ret[key] = v end end end
run()
click to toggle source
# File lib/embulk/input_twitterstream.rb, line 90 def run client = Twitter::Streaming::Client.new( consumer_key: @task['consumer_key'], consumer_secret: @task['consumer_secret'], access_token: @task['access_token'], access_token_secret: @task['access_token_secret'], ) count = @task['count'] ? @task['count'].to_i : 0 begin client.user do |item| case item when Twitter::Tweet tweet = dot_flatten(item.to_hash) @page_builder.add(@schema.map {|column| tweet.has_key?(column.name) ? tweet[column.name].to_s : ''}) if count > 0 count -= 1 if count == 0 raise StopStreamException if count == 0 end end end end rescue StopStreamException end @page_builder.finish commit_report = {} return commit_report end