class Magellan::Gcs::Proxy::PubsubSustainer
Attributes
delay[R]
interval[R]
message[R]
next_deadline[R]
next_limit[R]
Public Class Methods
new(message, delay: 10, interval: nil)
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 33 def initialize(message, delay: 10, interval: nil) @message = message @delay = delay.to_i @interval = (interval || @delay * 0.9).to_f end
run(message) { || ... }
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 13 def run(message) raise "#{name}.run requires block" unless block_given? if c = Proxy.config[:sustainer] t = Thread.new(message, c['delay'], c['interval']) do |msg, delay, interval| Thread.current[:processing_message] = true new(msg, delay: delay, interval: interval).run end begin yield ensure t[:processing_message] = false t.join end else yield end end
Public Instance Methods
debug(msg)
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 75 def debug(msg) logger.debug("#{self.class.name} #{msg}") end
reset_next_limit()
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 56 def reset_next_limit now = Time.now.to_f @next_limit = now + interval @next_deadline = now + delay end
run()
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 39 def run reset_next_limit loop do debug("is sleeping #{interval} sec.") unless wait_while_processing debug('is stopping.') break end send_delay reset_next_limit end debug('stopped.') rescue => e logger.error(e) end
send_delay()
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 62 def send_delay debug("is sending delay!(#{delay})") message.delay! delay debug("sent delay!(#{delay}) successfully") rescue Google::Apis::ServerError => e if Time.now.to_f < next_deadline sleep(1) # retry interval debug("is retrying to send delay! cause of [#{e.class.name}] #{e.message}") retry end raise e end
wait_while_processing()
click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 79 def wait_while_processing while Time.now.to_f < next_limit return false unless Thread.current[:processing_message] sleep(0.1) end true end