class LogStash::Inputs::Twitter

TODO(sissel): This could use some refactoring.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/twitter.rb, line 9
def register
  api_url = "https://stream.twitter.com/1/statuses/filter.json"
  @http = EventMachine::HttpRequest.new(api_url)
  @logger.info(["Registering input", { :url => @url, :api_url => api_url, :params => @urlopts }])
  source = "twitter://...#{@url.path}?#{@url.query}"

  if @url.user.nil? or @user.password.nil?
    message = "User and password missing for twitter input #{@url.to_s}")
    @logger.error(message)
    raise message
  end

  req = nil
  connect = proc do
    @logger.info(["Connecting", { :url => @url, :api_url => api_url, :params => @urlopts}])
    req = @http.post :body => @urlopts,
                     :head => { "Authorization" => [ @url.user, @url.password ] }
    buffer = BufferedTokenizer.new

    req.stream do |chunk|
      buffer.extract(chunk).each do |line|
        begin
          tweet = JSON.parse(line)
        rescue JSON::ParserError => e
          @logger.warn("Invalid JSON, aborting connection: #{line}")
          req.errback
          next
        end
        next if !tweet

        event = LogStash::Event.new({
          "@message" => tweet["text"],
          "@type" => @type,
          "@tags" => @tags.clone,
        })

        event.fields.merge!(
          "user" => (tweet["user"]["screen_name"] rescue nil),
          "client" => (tweet["user"]["source"] rescue nil),
          "retweeted" => (tweet["retweeted"] rescue nil)
        )

        event.fields["in-reply-to"] = tweet["in_reply_to_status_id"] if tweet["in_reply_to_status_id"]

        urls = tweet["entities"]["urls"] rescue []
        if urls.size > 0
          event.fields["urls"] = urls.collect { |u| u["url"] }
        end

        event.source = source
        @logger.debug(["Got event", event])
        @callback.call(event)
      end # buffer.extract
    end # req.stream

    req.errback do
      @logger.warn(["Error occurred, not sure what, seriously. Reconnecting!", { :url => @url }])

      req.close_connection() rescue nil

      EventMachine::Timer.new(60) do
        connect.call
      end
    end # req.errback

    req.callback do
      @logger.warn(["Request completed. Unexpected!", { :url => @url }])
    end
  end # connect = proc do

  connect.call
end