class Akane::Recorder
Public Class Methods
new(storages, timeout: 20, logger: Logger.new(nil))
click to toggle source
# File lib/akane/recorder.rb, line 8 def initialize(storages, timeout: 20, logger: Logger.new(nil)) @storages = storages @logger = logger @queue = Queue.new @recently_performed = RoundrobinFlags.new(1000) @timeout = timeout @stop = false end
Public Instance Methods
dequeue(raise_errors = false)
click to toggle source
# File lib/akane/recorder.rb, line 45 def dequeue(raise_errors = false) perform(*@queue.pop, raise_errors: raise_errors) end
mark_as_deleted(account, user_id, tweet_id)
click to toggle source
# File lib/akane/recorder.rb, line 27 def mark_as_deleted(account, user_id, tweet_id) return self if @stop @queue << [:mark_as_deleted, account, user_id, tweet_id] self end
perform(action, account, *payload, raise_errors: false)
click to toggle source
# File lib/akane/recorder.rb, line 49 def perform(action, account, *payload, raise_errors: false) if action == :record_tweet tweet = payload.last return if @recently_performed[tweet.id] @recently_performed.flag!(tweet.id) if tweet.retweet? perform(:record_tweet, account, tweet.retweeted_status, raise_errors: raise_errors) end end @storages.each do |storage| begin Timeout.timeout(@timeout) do storage.__send__(action, account, *payload) end rescue Timeout::Error => e raise e if raise_errors @logger.warn "#{storage.name} (#{action}) timed out" rescue Interrupt, SignalException, SystemExit => e raise e rescue Exception => e raise e if raise_errors @logger.error "Error while recorder performing to #{storage.inspect}: #{e.inspect}" @logger.error e.backtrace end end end
queue_length()
click to toggle source
# File lib/akane/recorder.rb, line 17 def queue_length @queue.size end
record_event(account, event)
click to toggle source
# File lib/akane/recorder.rb, line 39 def record_event(account, event) return self if @stop @queue << [:record_event, account, event] self end
record_message(account, message)
click to toggle source
# File lib/akane/recorder.rb, line 33 def record_message(account, message) return self if @stop @queue << [:record_message, account, message] self end
record_tweet(account, tweet)
click to toggle source
# File lib/akane/recorder.rb, line 21 def record_tweet(account, tweet) return self if @stop @queue << [:record_tweet, account, tweet] self end
run(raise_errors = false)
click to toggle source
# File lib/akane/recorder.rb, line 81 def run(raise_errors = false) @running_thread = Thread.new do loop do begin begin self.dequeue(raise_errors) rescue Interrupt, SignalException, Stop end if @stop break if self.queue_length.zero? @logger.info "processing queue: #{self.queue_length} remaining." end rescue Exception => e raise e if raise_errors @logger.error "Error while recorder dequing: #{e.inspect}" @logger.error e.backtrace end end @logger.info "Recorder stopped." @stop = false end @running_thread.join nil end
stop!()
click to toggle source
# File lib/akane/recorder.rb, line 109 def stop! @stop = true @running_thread.raise Stop end