class Magellan::Gcs::Proxy::PubsubSustainer

Attributes

delay[R]
interval[R]
message[R]
next_deadline[R]
next_limit[R]

Public Class Methods

new(message, delay: 10, interval: nil) click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 33
def initialize(message, delay: 10, interval: nil)
  @message = message
  @delay = delay.to_i
  @interval = (interval || @delay * 0.9).to_f
end
run(message) { || ... } click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 13
def run(message)
  raise "#{name}.run requires block" unless block_given?
  if c = Proxy.config[:sustainer]
    t = Thread.new(message, c['delay'], c['interval']) do |msg, delay, interval|
      Thread.current[:processing_message] = true
      new(msg, delay: delay, interval: interval).run
    end
    begin
      yield
    ensure
      t[:processing_message] = false
      t.join
    end
  else
    yield
  end
end

Public Instance Methods

debug(msg) click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 75
def debug(msg)
  logger.debug("#{self.class.name} #{msg}")
end
reset_next_limit() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 56
def reset_next_limit
  now = Time.now.to_f
  @next_limit = now + interval
  @next_deadline = now + delay
end
run() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 39
def run
  reset_next_limit
  loop do
    debug("is sleeping #{interval} sec.")
    unless wait_while_processing
      debug('is stopping.')
      break
    end
    send_delay
    reset_next_limit
  end
  debug('stopped.')
rescue => e
  logger.error(e)
end
send_delay() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 62
def send_delay
  debug("is sending delay!(#{delay})")
  message.delay! delay
  debug("sent delay!(#{delay}) successfully")
rescue Google::Apis::ServerError => e
  if Time.now.to_f < next_deadline
    sleep(1) # retry interval
    debug("is retrying to send delay! cause of [#{e.class.name}] #{e.message}")
    retry
  end
  raise e
end
wait_while_processing() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_sustainer.rb, line 79
def wait_while_processing
  while Time.now.to_f < next_limit
    return false unless Thread.current[:processing_message]
    sleep(0.1)
  end
  true
end