class BackRun::Pubsub
Public Class Methods
new()
click to toggle source
# File lib/back_run/pubsub.rb, line 6 def initialize @pubsub = Google::Cloud::Pubsub.new( project_id: Rails.application.config.back_run[:project_id], credentials: Rails.application.config.back_run[:credentials_path] ) end
Public Instance Methods
kill_job(job)
click to toggle source
# File lib/back_run/pubsub.rb, line 36 def kill_job(job) BackRun.logger.info("Moving to morgue queue #{job.klass}") publish(job, 'morgue') end
publish(job, topic = nil)
click to toggle source
# File lib/back_run/pubsub.rb, line 13 def publish(job, topic = nil) topic_name = topic || job.queue_name topic = fetch_topic(topic_name) topic.publish(job.to_json) end
subscribe(worker)
click to toggle source
The gem is using Concurrent::CachedThreadPool which overrides the max_threads, so it's ignoring the stream configuration used in the listen method. It may cause that lot's of threads are created when the traffic increases. This PR github.com/googleapis/google-cloud-ruby/pull/3682 fixes that but it's not released yet.
# File lib/back_run/pubsub.rb, line 24 def subscribe(worker) worker.queues.each do |queue| subscription = subscription_for(queue) subscriber = subscription.listen do |message| job = Job.from_json(message.data) worker.message_received(job, ack_callback(job, message), modify_ack_callback(message)) end subscriber.on_error { |error| BackRun.logger.error(error) } subscriber.start end end
Private Instance Methods
ack_callback(job, message)
click to toggle source
# File lib/back_run/pubsub.rb, line 48 def ack_callback(job, message) proc do BackRun.logger.info("Message received from queue: #{job.queue_name}") message.ack! end end
fetch_topic(topic_name)
click to toggle source
# File lib/back_run/pubsub.rb, line 62 def fetch_topic(topic_name) @pubsub.topic(topic_name) || @pubsub.create_topic(topic_name) end
modify_ack_callback(message)
click to toggle source
# File lib/back_run/pubsub.rb, line 55 def modify_ack_callback(message) proc do |seconds| BackRun.logger.info("Running job in #{seconds} seconds") message.modify_ack_deadline!(seconds) end end
subscription_for(queue)
click to toggle source
# File lib/back_run/pubsub.rb, line 43 def subscription_for(queue) topic = fetch_topic(queue) topic.subscription("worker_#{queue}") || topic.subscribe("worker_#{queue}") end