class Qyu::Store::ActiveRecord::Adapter

Constants

TYPE

Public Class Methods

new(config) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 17
def initialize(config)
  init_client(config)
end
valid_config?(config) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 10
def valid_config?(config)
  ConfigurationValidator.new(config).valid?
end

Public Instance Methods

clear_completed_jobs() click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 75
def clear_completed_jobs
  Job.joins(:tasks).where(tasks: { status: 'completed' }).destroy_all
end
count_jobs() click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 67
def count_jobs
  Job.count
end
delete_job(id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 71
def delete_job(id)
  Job.destroy(id)
end
delete_workflow(id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 38
def delete_workflow(id)
  Workflow.destroy(id)
end
delete_workflow_by_name(name) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 42
def delete_workflow_by_name(name)
  Workflow.where(name: name).destroy_all
end
find_job(id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 53
def find_job(id)
  j = Job.find_by(id: id)
  return if j.nil?

  wflow = Workflow.find_by(id: j.workflow_id)
  return if wflow.nil?

  deserialize_job(j, wflow)
end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) { |id| ... } click to toggle source

Task

# File lib/qyu/store/activerecord/adapter.rb, line 80
def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
  id = nil
  transaction do
    id_payload_combos = Task.where(
      name: name,
      queue_name: queue_name,
      job_id: job_id,
      parent_task_id: parent_task_id
    ).pluck(:id, :payload)

    id_payload_combos.each do |t_id, t_payload|
      if compare_payloads(t_payload, payload)
        id = t_id
        break
      end
    end

    if id.nil?
      id = Task.create!(name: name, queue_name: queue_name, payload: payload, job_id: job_id, parent_task_id: parent_task_id).id
    end

    yield(id)
  end

  id
end
find_task(id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 107
def find_task(id)
  task = Task.find_by(id: id)
  deserialize_task(task)
end
find_task_ids_by_job_id_and_name(job_id, name) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 112
def find_task_ids_by_job_id_and_name(job_id, name)
  Task.where(job_id: job_id, name: name).pluck(:id)
end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 116
def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
  Task.where(job_id: job_id, name: name, parent_task_id: parent_task_ids).pluck(:id)
end
find_workflow(id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 28
def find_workflow(id)
  wflow = Workflow.find_by(id: id)
  deserialize_workflow(wflow)
end
find_workflow_by_name(name) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 33
def find_workflow_by_name(name)
  wflow = Workflow.find_by(name: name)
  deserialize_workflow(wflow)
end
lock_task!(id, lease_time) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 132
def lock_task!(id, lease_time)
  Qyu.logger.debug '[LOCK] lock_task!'

  uuid = SecureRandom.uuid
  Qyu.logger.debug "[LOCK] uuid = #{uuid}"

  locked_until = seconds_after_time(lease_time)
  Qyu.logger.debug "[LOCK] locked_until = #{locked_until}"

  results = Task.where('id = ? AND (locked_until < now() OR locked_until IS NULL)', id).update(locked_by: uuid, locked_until: locked_until)

  return [nil, nil] if results.empty?

  locked_until = results[0].locked_until
  Qyu.logger.debug "[LOCK] locked_until from DB = #{locked_until}"

  [uuid, locked_until]
end
persist_job(workflow, payload) click to toggle source

Job

# File lib/qyu/store/activerecord/adapter.rb, line 47
def persist_job(workflow, payload)
  with_connection do
    Job.create!(payload: payload, workflow_id: workflow.id).id
  end
end
persist_workflow(name, descriptor) click to toggle source

Workflow

# File lib/qyu/store/activerecord/adapter.rb, line 22
def persist_workflow(name, descriptor)
  with_connection do
    Workflow.create!(name: name, descriptor: descriptor).id
  end
end
renew_lock_lease(id, lease_time, lease_token) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 156
def renew_lock_lease(id, lease_time, lease_token)
  Qyu.logger.debug "renew_lock_lease id = #{id}, lease_time = #{lease_time}, lease_token = #{lease_token}"

  results = with_connection do
    Task.where('id = ? AND locked_until > now() AND locked_by = ?', id, lease_token).update(locked_until: seconds_after_time(lease_time))
  end

  Qyu.logger.debug "renew_lock_lease results = #{results}"

  return nil if results.empty?

  results[0].locked_until
end
select_jobs(limit, offset, order = :asc) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 63
def select_jobs(limit, offset, order = :asc)
  Job.includes(:workflow).order(id: order).limit(limit).offset(offset).as_json(include: :workflow)
end
select_tasks_by_job_id(job_id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 128
def select_tasks_by_job_id(job_id)
  Task.where(job_id: job_id).as_json
end
task_status_counts(job_id) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 120
def task_status_counts(job_id)
  counts = Task.where(job_id: job_id).group(:name, :status).count
  counts.each_with_object({}) do |(k, v), obj|
    obj[k[0]] ||= Hash.new(0)
    obj[k[0]][k[1]] = v
  end
end
transaction() { || ... } click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 182
def transaction
  ::ActiveRecord::Base.transaction do
    yield
  end
end
unlock_task!(id, lease_token) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 151
def unlock_task!(id, lease_token)
  results = Task.where(id: id, locked_by: lease_token).update(locked_by: nil, locked_until: nil)
  !results.empty?
end
update_status(id, status) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 170
def update_status(id, status)
  results = Task.where(id: id).update(status: status)

  results.any? && results[0].status == status
end
with_connection() { || ... } click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 176
def with_connection
  ::ActiveRecord::Base.connection_pool.with_connection do
    yield
  end
end

Private Instance Methods

compare_payloads(payload1, payload2) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 190
def compare_payloads(payload1, payload2)
  sort(payload1) == sort(payload2)
end
deserialize_job(job, workflow) click to toggle source

j = JSON.parse(j) j = JSON.parse(j)

# File lib/qyu/store/activerecord/adapter.rb, line 207
def deserialize_job(job, workflow)
  j = job.as_json
  j['workflow'] = deserialize_workflow(workflow)
  j
end
deserialize_task(task) click to toggle source

t = JSON.parse(t)

# File lib/qyu/store/activerecord/adapter.rb, line 199
def deserialize_task(task)
  return if task.nil?

  task.as_json
end
deserialize_workflow(workflow) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 213
def deserialize_workflow(workflow)
  return if workflow.nil?

  wflow = workflow.as_json
  # wflow['descriptor'] = JSON.parse(wflow['descriptor'])
  wflow
end
init_client(config) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 221
def init_client(config)
  @@db_configuration = {
    adapter:  config[:db_type],
    database: config[:db_name],
    username: config[:db_user],
    host:     config[:db_host],
    port:     config[:db_port],
    pool:     config.fetch(:db_pool) { 5 }
  }

  @@db_configuration[:password] = config[:db_password] if config[:db_password]

  Utils.ensure_db_ready(@@db_configuration)
end
seconds_after_time(seconds, start_time = Time.now.utc) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 236
def seconds_after_time(seconds, start_time = Time.now.utc)
  start_time + seconds
end
sort(payload) click to toggle source
# File lib/qyu/store/activerecord/adapter.rb, line 194
def sort(payload)
  payload
end