class Qyu::Store::ActiveRecord::Adapter
Constants
- TYPE
Public Class Methods
new(config)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 17 def initialize(config) init_client(config) end
valid_config?(config)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 10 def valid_config?(config) ConfigurationValidator.new(config).valid? end
Public Instance Methods
clear_completed_jobs()
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 75 def clear_completed_jobs Job.joins(:tasks).where(tasks: { status: 'completed' }).destroy_all end
count_jobs()
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 67 def count_jobs Job.count end
delete_job(id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 71 def delete_job(id) Job.destroy(id) end
delete_workflow(id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 38 def delete_workflow(id) Workflow.destroy(id) end
delete_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 42 def delete_workflow_by_name(name) Workflow.where(name: name).destroy_all end
find_job(id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 53 def find_job(id) j = Job.find_by(id: id) return if j.nil? wflow = Workflow.find_by(id: j.workflow_id) return if wflow.nil? deserialize_job(j, wflow) end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) { |id| ... }
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 80 def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) id = nil transaction do id_payload_combos = Task.where( name: name, queue_name: queue_name, job_id: job_id, parent_task_id: parent_task_id ).pluck(:id, :payload) id_payload_combos.each do |t_id, t_payload| if compare_payloads(t_payload, payload) id = t_id break end end if id.nil? id = Task.create!(name: name, queue_name: queue_name, payload: payload, job_id: job_id, parent_task_id: parent_task_id).id end yield(id) end id end
find_task(id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 107 def find_task(id) task = Task.find_by(id: id) deserialize_task(task) end
find_task_ids_by_job_id_and_name(job_id, name)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 112 def find_task_ids_by_job_id_and_name(job_id, name) Task.where(job_id: job_id, name: name).pluck(:id) end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 116 def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) Task.where(job_id: job_id, name: name, parent_task_id: parent_task_ids).pluck(:id) end
find_workflow(id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 28 def find_workflow(id) wflow = Workflow.find_by(id: id) deserialize_workflow(wflow) end
find_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 33 def find_workflow_by_name(name) wflow = Workflow.find_by(name: name) deserialize_workflow(wflow) end
lock_task!(id, lease_time)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 132 def lock_task!(id, lease_time) Qyu.logger.debug '[LOCK] lock_task!' uuid = SecureRandom.uuid Qyu.logger.debug "[LOCK] uuid = #{uuid}" locked_until = seconds_after_time(lease_time) Qyu.logger.debug "[LOCK] locked_until = #{locked_until}" results = Task.where('id = ? AND (locked_until < now() OR locked_until IS NULL)', id).update(locked_by: uuid, locked_until: locked_until) return [nil, nil] if results.empty? locked_until = results[0].locked_until Qyu.logger.debug "[LOCK] locked_until from DB = #{locked_until}" [uuid, locked_until] end
persist_job(workflow, payload)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 47 def persist_job(workflow, payload) with_connection do Job.create!(payload: payload, workflow_id: workflow.id).id end end
persist_workflow(name, descriptor)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 22 def persist_workflow(name, descriptor) with_connection do Workflow.create!(name: name, descriptor: descriptor).id end end
renew_lock_lease(id, lease_time, lease_token)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 156 def renew_lock_lease(id, lease_time, lease_token) Qyu.logger.debug "renew_lock_lease id = #{id}, lease_time = #{lease_time}, lease_token = #{lease_token}" results = with_connection do Task.where('id = ? AND locked_until > now() AND locked_by = ?', id, lease_token).update(locked_until: seconds_after_time(lease_time)) end Qyu.logger.debug "renew_lock_lease results = #{results}" return nil if results.empty? results[0].locked_until end
select_jobs(limit, offset, order = :asc)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 63 def select_jobs(limit, offset, order = :asc) Job.includes(:workflow).order(id: order).limit(limit).offset(offset).as_json(include: :workflow) end
select_tasks_by_job_id(job_id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 128 def select_tasks_by_job_id(job_id) Task.where(job_id: job_id).as_json end
task_status_counts(job_id)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 120 def task_status_counts(job_id) counts = Task.where(job_id: job_id).group(:name, :status).count counts.each_with_object({}) do |(k, v), obj| obj[k[0]] ||= Hash.new(0) obj[k[0]][k[1]] = v end end
transaction() { || ... }
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 182 def transaction ::ActiveRecord::Base.transaction do yield end end
unlock_task!(id, lease_token)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 151 def unlock_task!(id, lease_token) results = Task.where(id: id, locked_by: lease_token).update(locked_by: nil, locked_until: nil) !results.empty? end
update_status(id, status)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 170 def update_status(id, status) results = Task.where(id: id).update(status: status) results.any? && results[0].status == status end
with_connection() { || ... }
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 176 def with_connection ::ActiveRecord::Base.connection_pool.with_connection do yield end end
Private Instance Methods
compare_payloads(payload1, payload2)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 190 def compare_payloads(payload1, payload2) sort(payload1) == sort(payload2) end
deserialize_job(job, workflow)
click to toggle source
j = JSON.parse(j) j = JSON.parse(j)
# File lib/qyu/store/activerecord/adapter.rb, line 207 def deserialize_job(job, workflow) j = job.as_json j['workflow'] = deserialize_workflow(workflow) j end
deserialize_task(task)
click to toggle source
t = JSON.parse(t)
# File lib/qyu/store/activerecord/adapter.rb, line 199 def deserialize_task(task) return if task.nil? task.as_json end
deserialize_workflow(workflow)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 213 def deserialize_workflow(workflow) return if workflow.nil? wflow = workflow.as_json # wflow['descriptor'] = JSON.parse(wflow['descriptor']) wflow end
init_client(config)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 221 def init_client(config) @@db_configuration = { adapter: config[:db_type], database: config[:db_name], username: config[:db_user], host: config[:db_host], port: config[:db_port], pool: config.fetch(:db_pool) { 5 } } @@db_configuration[:password] = config[:db_password] if config[:db_password] Utils.ensure_db_ready(@@db_configuration) end
seconds_after_time(seconds, start_time = Time.now.utc)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 236 def seconds_after_time(seconds, start_time = Time.now.utc) start_time + seconds end
sort(payload)
click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 194 def sort(payload) payload end