class Mups::Consumer
Attributes
queue[R]
stream_uri[R]
Public Class Methods
new(redis)
click to toggle source
# File lib/mups/consumer.rb, line 9 def initialize(redis) @redis = redis # Try to start the stream with the las mtime enqueued @stream_uri = if @redis.llen("mups:mtime") > 0 mtime = @redis.lpop("mups:mtime") URI("http://stream.meetup.com/2/open_events?since_mtime=#{mtime}") else URI("http://stream.meetup.com/2/open_events") end @queue = Queue.new @logger = Logger.new(STDOUT) end
Public Instance Methods
start()
click to toggle source
# File lib/mups/consumer.rb, line 23 def start Thread.abort_on_exception = true read = Thread.new(queue) do |queue| body = HTTP.get(@stream_uri).body buffer = "" body.each do |chunk| buffer += chunk # Enqueue event data just if we have a complete chunk if buffer.chars.last == "\n" queue.push(buffer) buffer = "" end end # Push nil to signify there's no more data in the queue queue.push nil end consume = Thread.new(queue) do |queue| while queue_data = queue.pop break if queue_data.is_a? NilClass begin title = JSON.parse(queue_data)["name"] count = @redis.lpush("mups:titles", title) mtime = JSON.parse(queue_data)["mtime"] @redis.lpush("mups:mtime", mtime) @logger.info("REDIS -> list count #{count}") rescue JSON::ParserError @logger.error("Couldn't parse JSON stream, not saving...") end end end [read, consume].map(&:join) end