class ActiveJob::QueueAdapters::GoogleCloudrunTasksAdapter
Public Class Methods
new()
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 6 def initialize @client = Google::Cloud::Tasks.cloud_tasks @project_id = GoogleCloudRun.project_id @service_account_email = GoogleCloudRun.default_service_account_email @default_job_timeout_sec = Rails.application.config.google_cloudrun.job_timeout_sec @job_callback_url = Rails.application.config.google_cloudrun.job_callback_url @queue_default_region = Rails.application.config.google_cloudrun.job_queue_default_region if @job_callback_url.blank? || !@job_callback_url.end_with?(Rails.application.config.google_cloudrun.job_callback_path) raise "Set config.google_cloudrun.job_callback_url to 'https://your-domain.com#{Rails.application.config.google_cloudrun.job_callback_path}'" end if !@job_callback_url.start_with?("https://") raise "config.google_cloudrun.job_callback_url must start with https://" end end
Public Instance Methods
enqueue(job)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 23 def enqueue(job) create_cloudtask(job.class, job.job_id, job.queue_name, local_timeout(job) || @default_job_timeout_sec, nil, job.serialize) end
enqueue_at(job, timestamp)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 32 def enqueue_at(job, timestamp) create_cloudtask(job.class, job.job_id, job.queue_name, local_timeout(job) || @default_job_timeout_sec, timestamp, job.serialize) end
Private Instance Methods
build_task_request(name, url, service_account_email, body, job_timeout, scheduled_at)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 71 def build_task_request(name, url, service_account_email, body, job_timeout, scheduled_at) # ref: https://cloud.google.com/tasks/docs/reference/rest/v2/projects.locations.queues.tasks#Task req = { name: name, http_request: { oidc_token: { service_account_email: service_account_email }, headers: { "Content-Type": "application/json" }, http_method: "POST", url: url, body: body, }, } d = Google::Protobuf::Duration.new d.seconds = job_timeout.to_i req[:dispatch_deadline] = d if scheduled_at t = Google::Protobuf::Timestamp.new t.seconds = Time.at(scheduled_at).utc.to_i req[:schedule_time] = t end return req end
create_cloudtask(job_name, job_id, full_queue_name, job_timeout, scheduled_at, job)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 43 def create_cloudtask(job_name, job_id, full_queue_name, job_timeout, scheduled_at, job) return if !Rails.application.config.google_cloudrun.jobs region, queue_name = parse_full_queue_name(full_queue_name) queue = @client.queue_path project: @project_id, location: region, queue: queue_name task = build_task_request( "projects/#{@project_id}/locations/#{region}/queues/#{queue_name}/tasks/#{job_id}", @job_callback_url, @service_account_email, job.to_json, job_timeout, scheduled_at, ) response = nil begin response = @client.create_task parent: queue, task: task rescue => e raise "Failed sending job #{job_name}(#{job_id}) to queue '#{region}/#{queue_name}'. #{e.message}" end if response.nil? raise "Failed sending job #{job_name}(#{job_id}) to queue '#{region}/#{queue_name}'. Google didn't return a response." end Rails.logger&.info "Job #{job_name}(#{job_id}) sent to queue '#{region}/#{queue_name}'" end
local_timeout(job)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 116 def local_timeout(job) begin job.class.class_variable_get(:@@google_cloudrun_job_timeout) rescue nil end end
parse_full_queue_name(queue_name)
click to toggle source
# File lib/google_cloud_run/job_adapter.rb, line 97 def parse_full_queue_name(queue_name) # config.active_job.queue_name_prefix will add an underscore, # queue names can't have underscores. Let's turn it into a hyphen. queue_name = queue_name.gsub("_", "-") # see if we have something like this: region/queue parts = queue_name.split("/") if parts.size == 2 return parts[0], parts[1] end if @queue_default_region.blank? raise "queue_as \"#{queue_name}\" needs region: \"region/#{queue_name}\" or set config.google_cloudrun.job_queue_default_region" end # use our default region return @queue_default_region, queue_name end