module Postjob::CLI

rubocop:disable Lint/HandleExceptions

rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength

rubocop:disable Lint/HandleExceptions

rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength

rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength rubocop:disable Metrics/ModuleLength rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/ParameterLists

rubocop:disable Lint/HandleExceptions

rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength

Constants

USE_ACTIVE_RECORD

Public Instance Methods

confirm!(msg) click to toggle source
# File lib/postjob/cli/db.rb, line 44
  def confirm!(msg)
    STDERR.puts <<~TXT
      #{msg.chomp}

      Press return to continue, ^C to cancel...
    TXT
    STDIN.gets
  rescue Interrupt
    raise "Cancelled by user"
  end
cron(limit: "30", queue: nil) click to toggle source

Lists cron jobs in the system

# File lib/postjob/cli/cron.rb, line 5
def cron(limit: "30", queue: nil)
  limit = Integer(limit)
  queue = queue.split(",") if queue

  connect_to_database!

  query = ps_query(limit: limit, queue: queue, tags: nil)
  query = query.where("cron_interval IS NOT NULL")
  # query = query
  print_results query: query
end
cron_disable(workflow, *args) click to toggle source

Disable an existing cron job

# File lib/postjob/cli/cron.rb, line 36
def cron_disable(workflow, *args)
  connect_to_database!
  Postjob::Queue.disable_cron_jobs(workflow, args)
end
cron_enqueue(workflow, *args, interval:, queue: nil, tags: nil) click to toggle source

Enqueues a cron workflow

# File lib/postjob/cli/cron.rb, line 27
def cron_enqueue(workflow, *args, interval:, queue: nil, tags: nil)
  interval = Integer(interval)

  connect_to_database!

  Postjob.enqueue! workflow, *args, queue: queue, tags: parse_tags(tags), cron_interval: interval
end
cron_top(limit: "30", queue: nil) click to toggle source
# File lib/postjob/cli/cron.rb, line 17
def cron_top(limit: "30", queue: nil)
  loop do
    system "clear"
    cron limit: limit, queue: queue
    sleep 1
  end
rescue Interrupt
end
db_migrate() click to toggle source
# File lib/postjob/cli/db.rb, line 14
def db_migrate
  connect_to_database!
  Postjob::Migrations.migrate!
end
db_remigrate(force: false) click to toggle source
# File lib/postjob/cli/db.rb, line 31
  def db_remigrate(force: false)
    unless force
      confirm! <<~TXT
        Really remigrating database? This will destroy all data about postjobs!
        (To prevent the need to confirm run with '--force'.)
      TXT
    end

    connect_to_database!
    Postjob::Migrations.unmigrate!
    Postjob::Migrations.migrate!
  end
db_settings() click to toggle source

Prints all settings in the queue

# File lib/postjob/cli/db.rb, line 6
def db_settings
  connect_to_database!

  raise "No settings in this database schema" unless ::Postjob::Queue.settings?

  Simple::SQL.print "SELECT name,value FROM postjob.settings ORDER BY name"
end
db_unmigrate() click to toggle source
# File lib/postjob/cli/db.rb, line 19
  def db_unmigrate
    unless force
      confirm! <<~TXT
        Really unmigrating database? This will destroy all data about postjobs!
        (To prevent the need to confirm run with '--force'.)
      TXT
    end

    connect_to_database!
    Postjob::Migrations.unmigrate!
  end
enqueue(workflow, *args, queue: nil, tags: nil, count: "1") click to toggle source

Enqueues a workflow

Adds a workflow to the job table, with name <workflow> and the given arguments.

Note that the workflow will receive the arguments as strings and must be prepared to handle these.

# File lib/postjob/cli/enqueue.rb, line 9
def enqueue(workflow, *args, queue: nil, tags: nil, count: "1")
  count = Integer(count)

  connect_to_database!

  count.times do
    Postjob.enqueue! workflow, *args, queue: queue, tags: parse_tags(tags)
  end
end
events(limit: "30") click to toggle source

Show the latest job event

Example:

postjob events
# File lib/postjob/cli/events.rb, line 40
def events(limit: "30")
  expect! limit => /\A\d+\z/
  limit = Integer(limit)

  connect_to_database!

  query = events_query(limit: limit)

  print_results query: query
end
events_top(limit: "30") click to toggle source

Show up-to-date events information once per second

# File lib/postjob/cli/events.rb, line 52
def events_top(limit: "30")
  loop do
    system "clear"
    events(limit: limit)
    sleep 1
  end
rescue Interrupt
end
heartbeat(limit: "30") click to toggle source

Show the latest heartbeat events

# File lib/postjob/cli/heartbeat.rb, line 34
def heartbeat(limit: "30")
  expect! limit => /\A\d+\z/
  limit = Integer(limit)

  connect_to_database!

  query = heartbeat_query(limit: limit)

  Postjob.logger.info "CPU load and friends are for the last minute"
  print_results query: query
end
heartbeat_top(limit: "30") click to toggle source

Show up-to-date heartbeat information once per second

# File lib/postjob/cli/heartbeat.rb, line 47
def heartbeat_top(limit: "30")
  loop do
    system "clear"
    heartbeat(limit: limit)
    sleep 1
  end
rescue Interrupt
end
host_reset() click to toggle source

resets the host id

# File lib/postjob/cli/hosts.rb, line 67
def host_reset
  Postjob::Host.clear_storage
end
host_restart() click to toggle source

Set the host to running again

# File lib/postjob/cli/hosts.rb, line 79
def host_restart
  connect_to_database!

  Simple::SQL.ask "UPDATE postjob.hosts SET status='running' WHERE id=$1::uuid", ::Postjob.host_id
end
host_shutdown() click to toggle source

Set the host to shutdown state

# File lib/postjob/cli/hosts.rb, line 72
def host_shutdown
  connect_to_database!

  Postjob::Host.shutdown!(host_id: ::Postjob.host_id)
end
hosts(limit: "30") click to toggle source

Show hosts status

This command lists all worker_sessions currently in the system.

Example:

postjob hosts
# File lib/postjob/cli/hosts.rb, line 45
def hosts(limit: "30")
  expect! limit => /\A\d+\z/
  limit = Integer(limit)

  connect_to_database!

  query = hosts_query(limit: limit)

  print_results query: query
end
hosts_top(limit: "30") click to toggle source

Show up-to-date hosts information once per second

# File lib/postjob/cli/hosts.rb, line 57
def hosts_top(limit: "30")
  loop do
    system "clear"
    hosts(limit: limit)
    sleep 1
  end
rescue Interrupt
end
hosts_zombies() click to toggle source
# File lib/postjob/cli/hosts.rb, line 85
  def hosts_zombies
    Simple::SQL.print <<~SQL
        SELECT hosts.*
        FROM postjob.hosts hosts
        LEFT JOIN (
          SELECT id, host_id
          FROM postjob.events events
          WHERE name='heartbeat'
            AND created_at > now() at time zone 'utc' - interval '5 minutes'
        ) heartbeats ON hosts.id=heartbeats.host_id
      WHERE status IN ('running', 'shutdown')
        AND heartbeats.id IS NULL
    SQL
  end
job_enqueue(workflow, *args, queue: nil, tags: nil) click to toggle source
# File lib/postjob/cli/job.rb, line 8
def job_enqueue(workflow, *args, queue: nil, tags: nil)
  logger "The job:enqueue command is deprecated, pls use enqueue instead."
  enqueue(workflow, *args, queue: queue, tags: tags)
end
job_force(job_id) click to toggle source

runs a job as soon as possible

# File lib/postjob/cli/job.rb, line 62
  def job_force(job_id)
    connect_to_database!

    job_id = Integer(job_id)

    Simple::SQL.ask <<~SQL, job_id
      UPDATE postjob.postjobs
      SET
        next_run_at=now() at time zone 'utc'
      WHERE id = $1 AND status IN ('ready', 'err', 'sleep')
    SQL
  end
job_kill(job_id) click to toggle source

Kills a specific job

# File lib/postjob/cli/job.rb, line 45
  def job_kill(job_id)
    connect_to_database!

    job_id = Integer(job_id)

    Simple::SQL.ask <<~SQL, job_id
      UPDATE postjob.postjobs
      SET
        status='failed',
        next_run_at=null,
        error='Manually terminated',
        error_message='Manually terminated'
      WHERE id = $1 AND status IN ('ready', 'err', 'sleep')
    SQL
  end
job_reset(job_id) click to toggle source

Reset failed jobs

This resets all failed jobs within the job tree, below the passed in job id.

# File lib/postjob/cli/job.rb, line 17
  def job_reset(job_id)
    connect_to_database!

    job_id = Integer(job_id)
    full_job_id = Simple::SQL.ask "SELECT full_id FROM postjob.postjobs WHERE id=$1", job_id
    full_job_id || logger.error("No such job: #{job_id}")

    job_ids = Simple::SQL.all <<~SQL
      SELECT id FROM postjob.postjobs
      WHERE (full_id LIKE '#{full_job_id}.%' OR full_id='#{full_job_id}')
        AND status IN ('failed', 'err', 'timeout')
    SQL

    logger.warn "Affected jobs: #{job_ids.count}"
    return if job_ids.empty?

    Simple::SQL.ask <<~SQL, job_ids
      UPDATE postjob.postjobs
      SET
        status='ready', next_run_at=(now() at time zone 'utc'),
        results=null, failed_attempts=0, error=NULL, error_message=NULL, error_backtrace=NULL
      WHERE id = ANY($1);
    SQL

    logger.warn "The following jobs have been reset: #{job_ids.join(', ')}"
  end
job_resolve(token, result) click to toggle source

Resolve a job by token

# File lib/postjob/cli/job.rb, line 3
def job_resolve(token, result)
  connect_to_database!
  Postjob.resolve(token: token, result: result)
end
ps(*ids, limit: "30", tags: nil, queue: nil, only_root: nil) click to toggle source

Show job status

This command lists the statuses of all jobs that are either root jobs, i.e. enqueued workflows, or that have failed.

Example:

postjob ps --tags=foo:bar,bar:baz --limit=100

For a listing of all jobs in the system use ps:full, see 'postjob help ps:full' for details.

# File lib/postjob/cli/ps.rb, line 76
def ps(*ids, limit: "30", tags: nil, queue: nil, only_root: nil)
  expect! limit => /\A\d+\z/
  limit = Integer(limit)
  queue = queue.split(",") if queue

  connect_to_database!

  # check for timed out and zombie processes
  # ::Postjob::Queue.checkout(nil)

  unless ids.empty?
    ps_full(*ids, limit: limit, tags: tags)
    return
  end

  query = ps_query(tags: tags, limit: limit, queue: queue, only_root: only_root)
  query = query.where("root_id=id OR status NOT IN ('sleep', 'ok') OR failed_attempts > 0")

  print_results query: query
end
ps_full(*ids, limit: "30", tags: nil, queue: nil) click to toggle source
# File lib/postjob/cli/ps.rb, line 97
def ps_full(*ids, limit: "30", tags: nil, queue: nil)
  queue = queue.split(",") if queue

  connect_to_database!

  query = ps_query(tags: tags, limit: limit, queue: queue)

  unless ids.empty?
    parsed_ids = parse_ids(*ids)
    query = query.where("root_id IN (#{parsed_ids.join(',')})")
  end

  print_results query: query, on_empty: "Note that ps_full requires the **root ids**"
end
ps_result(id) click to toggle source

Print the result of a job

# File lib/postjob/cli/ps.rb, line 127
def ps_result(id)
  connect_to_database!

  scope = Simple::SQL::Scope.new("SELECT * FROM postjob.postjobs")
  scope = scope.where("postjobs.id = ANY(?)", parse_ids(id))

  job = Simple::SQL.ask(scope, into: Postjob::Job)
  raise "Job ##{id}: no such job" unless job

  if job.status == "timeout" || job.status == "failed"
    # resolve raises exceptions on failing jobs. At this point we'll just tell
    # the user to be aware of that, and let job.resolve fail below.
    logger.warn "Job failed. Details below."
  end

  result = job.resolve

  # Pending jobs don't have a result yet. Just saying..
  raise "Job ##{id}: job is still pending" if result == :pending

  # Print result. If this is a structure we pretty print, if not we
  # print verbatim.
  if result.is_a?(String) || !result.respond_to?(:empty?)
    puts result
  else
    puts JSON.pretty_generate(result)
  end
end
ps_show(id, *ids) click to toggle source

Show all information about this job

# File lib/postjob/cli/ps.rb, line 157
  def ps_show(id, *ids)
    connect_to_database!

    scope = Simple::SQL::Scope.new <<~SQL
      SELECT postjobs.*, tokens.token
      FROM postjob.postjobs AS postjobs
      LEFT JOIN postjob.tokens AS tokens ON tokens.postjob_id=postjobs.id
    SQL
    scope = scope.where("postjobs.id = ANY(?)", parse_ids(id, *ids))

    Simple::SQL.all(scope, into: Postjob::Job) do |job|
      pp job
    end
  end
ps_top(*ids, limit: "30", tags: nil, full: false, queue: nil, only_root: false) click to toggle source

Show up-to-date information once per second

# File lib/postjob/cli/ps.rb, line 113
def ps_top(*ids, limit: "30", tags: nil, full: false, queue: nil, only_root: false)
  loop do
    system "clear"
    if full
      ps_full(*ids, limit: limit, tags: tags, queue: queue)
    else
      ps(*ids, limit: limit, tags: tags, queue: queue, only_root: only_root)
    end
    sleep 1
  end
rescue Interrupt
end
queues() click to toggle source

Show queue status

# File lib/postjob/cli/queues.rb, line 5
  def queues
    connect_to_database!

    Simple::SQL.print <<~SQL
      WITH
        queues AS (
          SELECT DISTINCT(queue) AS queue FROM postjob.postjobs
        ),
        jobs AS (
          SELECT
            queue,
            SUM(IFF(status IN ('ready', 'sleep'), 1, 0))    AS waiting,
            SUM(IFF(status IN ('processing'), 1, 0))        AS processing,
            SUM(IFF(status='ok', 1, 0))                     AS succeeded,
            SUM(IFF(status IN ('failed', 'timeout'), 1, 0)) AS failed
          FROM postjob.postjobs
          GROUP BY queue
        ),
        workflows AS (
          SELECT
            queue,
            SUM(IFF(status IN ('ready', 'sleep'), 1, 0))    AS waiting,
            SUM(IFF(status IN ('processing'), 1, 0))        AS processing,
            SUM(IFF(status='ok', 1, 0))                     AS succeeded,
            SUM(IFF(status IN ('failed', 'timeout'), 1, 0)) AS failed
          FROM postjob.postjobs
          WHERE id=root_id
          GROUP BY queue
        ),
        workers AS (
          SELECT
            sq.queue,
            COUNT(*) AS count
          FROM (
            SELECT
              hosts.id,
              UNNEST(sessions.queues) AS queue,
              hosts.status,
              heartbeats.created_at AS latest_heartbeat
            FROM postjob.hosts hosts
            INNER JOIN postjob.worker_sessions sessions ON sessions.host_id=hosts.id AND sessions.status = 'running'
            LEFT JOIN (
              SELECT host_id, MAX(created_at) AS created_at
              FROM postjob.events events
              WHERE name='heartbeat'
                AND created_at > now() at time zone 'utc' - interval '5 minutes'
              GROUP BY host_id
            ) heartbeats ON hosts.id=heartbeats.host_id
            WHERE hosts.status in ('running', 'shutdown') AND heartbeats.created_at IS NOT NULL
          ) sq
          GROUP BY queue
        )
      SELECT
        queues.queue,
        COALESCE(workers.count, 0)          AS "workers",
        COALESCE(jobs.waiting, 0)           AS "jobs waiting",
        COALESCE(jobs.processing, 0)        AS "jobs processing",
        COALESCE(jobs.succeeded, 0)         AS "jobs succeeded",
        COALESCE(jobs.failed, 0)            AS "jobs failed",
        COALESCE(workflows.waiting, 0)      AS "workflows waiting",
        COALESCE(workflows.processing, 0)   AS "workflows processing",
        COALESCE(workflows.succeeded, 0)    AS "workflows succeeded",
        COALESCE(workflows.failed, 0)       AS "workflows failed"
      FROM queues
      LEFT JOIN workers USING(queue)
      LEFT JOIN jobs USING(queue)
      LEFT JOIN workflows USING(queue)
      ORDER BY queue
    SQL
  end
queues_top() click to toggle source

Show up-to-date queue information once per second

# File lib/postjob/cli/queues.rb, line 77
def queues_top
  loop do
    system "clear"
    queues
    sleep 1
  end
rescue Interrupt
end
registry() click to toggle source
# File lib/postjob/cli/registry.rb, line 2
def registry
  puts "=== Queues =========================================================="
  puts Postjob::Registry.queues.join(", ")
  puts "=== Workflows ======================================================="
  require "table_print"

  workflows_with_versions = Postjob::Registry.workflows.keys.reject { |k| k[1] == "" }
  workflows_with_versions = workflows_with_versions.sort_by { |name, _version| name }

  data = workflows_with_versions.map do |name, version|
    spec = Postjob::Registry.lookup! name: name, version: version
    {
      name: name,
      options: spec.options.inspect
    }
  end

  tp data
end
run(count: nil, queue: nil, fast: false, host_id: nil) click to toggle source

Run postjobs.

This method runs jobs as they become ready.

Parameters:

  • –count=<count> maximum number of jobs to process. Default: unlimited.

  • –queue=queue1,queue2,queue3 run only the specified queues.

  • –heartbeat=no don't start heartbeat process.

# File lib/postjob/cli/run.rb, line 47
def run(count: nil, queue: nil, fast: false, host_id: nil)
  count = Integer(count) if count
  processed = _run(count: count, queue: queue, fast: fast, host_id: host_id, heartbeat: false)

  if !count || processed < count
    # The runner has been shut down externally. Wait for interrupt.
    Postjob.logger.success "External shut down initiated."
    STDERR.puts "External shut down initiated. Press ^C to terminate process."
    STDIN.read
  end
end
run!(command, *args) click to toggle source
Calls superclass method
# File lib/postjob/cli.rb, line 13
def run!(command, *args)
  Postjob.logger = logger
  load_environment!

  super
end
run_control(host_id: nil) click to toggle source

Start the control connection

The control connection runs the heartbeat, and checks for jobs on the control queue.

# File lib/postjob/cli/run.rb, line 11
def run_control(host_id: nil)
  # By running the "control" queue this code is quite efficient, it does
  # not poll the database.
  _run(queue: "control", host_id: host_id, heartbeat: true)

  host_id ||= Postjob.host_id

  # Poll until there are no more working sessions.
  # [TODO] - improve by listening.
  wait_for_no_running_session(host_id: host_id)

  run_control_shutdown(host_id: host_id)
end
sessions(limit: "30") click to toggle source

Show sessions status

This command lists all worker sessions currently in the system.

Example:

postjob sessions
# File lib/postjob/cli/sessions.rb, line 62
def sessions(limit: "30")
  expect! limit => /\A\d+\z/
  limit = Integer(limit)

  connect_to_database!

  # check for timed out and zombie processes
  # ::Postjob::Queue.checkout(nil)

  query = sessions_query(limit: limit)

  print_results query: query
end
sessions_top(limit: "30") click to toggle source

Show up-to-date session information once per second

# File lib/postjob/cli/sessions.rb, line 77
def sessions_top(limit: "30")
  loop do
    system "clear"
    sessions(limit: limit)
    sleep 1
  end
rescue Interrupt
end
step(count: 1, queue: nil, host_id: nil) click to toggle source

Run a single job

# File lib/postjob/cli/run.rb, line 3
def step(count: 1, queue: nil, host_id: nil)
  run count: count, queue: queue, host_id: host_id, heartbeat: false
end
version() click to toggle source

Prints version info

# File lib/postjob/cli/version.rb, line 3
def version
  puts "This is postjob (ruby) #{Postjob::VERSION}"

  begin
    connect_to_database!
    puts "postjob/schema_version: #{::Postjob::Queue.version}"
  rescue StandardError
    Postjob.logger.warn "Cannot read postjob schema version. Database might not be configured or migrated."
  end
end

Private Instance Methods

_run(count: nil, queue:, fast: false, host_id:, heartbeat:) click to toggle source
# File lib/postjob/cli/run.rb, line 61
def _run(count: nil, queue:, fast: false, host_id:, heartbeat:)
  expect! Integer(host_id, 16) => 1..0xffffffff if host_id

  expect! heartbeat => [ true, false ]

  Postjob.fast_mode = (fast ? true : false)

  connect_to_database!

  if host_id
    Postjob::Host.host_id = "%08x-0000-0000-0000-000000000000" % Integer(host_id, 16)
    Postjob.logger.info "Using host_id: #{Postjob::Host.host_id}"
  end

  processed = Postjob.run(count: count, queues: queue&.split(","), heartbeat: heartbeat) do |job_id|
    logger.info "Processed job w/id #{job_id}" if job_id
  end

  logger.info "Processed #{processed} jobs"

  processed
end
connect_to_database!() click to toggle source
# File lib/postjob/cli/db.rb, line 59
def connect_to_database!
  return if @connected_to_database

  @connected_to_database = true
  if USE_ACTIVE_RECORD
    require "active_record"
    abc = ::Simple::SQL::Config.read_database_yml
    ::ActiveRecord::Base.establish_connection abc
  else
    ::Simple::SQL.connect!
  end
end
events_query(limit:) click to toggle source
# File lib/postjob/cli/events.rb, line 7
  def events_query(limit:)
    limit = Integer(limit)

    sql = <<-SQL
      SELECT
        events.id,
        events.name,
        events.postjob_id AS job_id,
        postjobs.workflow
          || (CASE WHEN postjobs.workflow_version != '' THEN '@' ELSE '' END)
          || postjobs.workflow_version
          || (CASE WHEN postjobs.workflow_method != 'run' THEN '.' || postjobs.workflow_method ELSE '' END)
          || postjobs.args AS job,
          worker_session_id,
        events.created_at
      FROM postjob.events events
      LEFT JOIN postjob.postjobs postjobs ON events.postjob_id=postjobs.id
      WHERE events.name != 'heartbeat'
    SQL

    scope = Simple::SQL::Scope.new(sql)
    scope
      .order_by("events.id DESC")
      .paginate(per: limit, page: 1)
  end
heartbeat_query(limit:) click to toggle source
# File lib/postjob/cli/heartbeat.rb, line 6
  def heartbeat_query(limit:)
    limit = Integer(limit)

    sql = <<-SQL
      SELECT
        name,
        postjob_id AS job_id,
        events.host_id,
        (events.attributes->>'uptime')::interval AS uptime,
        to_char((events.attributes->>'cpu_load_1min')::float, '99D99') AS cpu_load,
        events.attributes->>'net_in_1min' AS net_in,
        events.attributes->>'net_out_1min' AS net_out,
        events.attributes->>'net_errors_1min' AS net_errors,
        now() at time zone 'utc' - events.created_at AS age
      FROM postjob.events events
      LEFT JOIN postjob.worker_sessions worker_sessions ON events.worker_session_id=worker_sessions.id
      WHERE events.name = 'heartbeat'
    SQL

    scope = Simple::SQL::Scope.new(sql)
    scope
      .order_by("events.id DESC")
      .paginate(per: limit, page: 1)
  end
hosts_query(limit:) click to toggle source
# File lib/postjob/cli/hosts.rb, line 7
  def hosts_query(limit:)
    limit = Integer(limit)

    sql = <<-SQL
      SELECT
        hosts.id,
        hosts.created_at,
        hosts.status,
        heartbeats.attributes AS heartbeat,
        heartbeats.created_at AS heartbeat_created_at,
        hosts.attributes
      FROM postjob.hosts hosts
      LEFT JOIN (
        SELECT
          host_id,
          attributes,
          created_at,
          rank() OVER (PARTITION BY host_id ORDER BY created_at DESC) AS rank
        FROM postjob.events events
        WHERE name='heartbeat'
      ) heartbeats ON heartbeats.host_id=hosts.id AND rank = 1
    SQL

    scope = Simple::SQL::Scope.new(sql)
    scope
      .order_by("hosts.created_at DESC NULLS LAST")
      .paginate(per: limit, page: 1)
  end
load_environment(path) click to toggle source
# File lib/postjob/cli.rb, line 22
def load_environment(path)
  return unless File.exist?(path)

  logger.debug "#{path}: loading Postjob configuration"
  load path
end
load_environment!() click to toggle source
# File lib/postjob/cli.rb, line 29
def load_environment!
  load_environment("config/environment.rb")
  load_environment("config/postjob.rb")
end
parse_ids(*ids) click to toggle source
# File lib/postjob/cli/helpers.rb, line 6
def parse_ids(*ids)
  ids.map do |s|
    s = s.gsub(/.*\./, "")
    Integer(s)
  end.uniq
end
parse_tags(tags) click to toggle source

parses “foo:bar,baz:quibble” into { “foo” => “bar”, “baz” => “quibble”}

# File lib/postjob/cli/job.rb, line 78
def parse_tags(tags)
  return nil unless tags
  tags.split(",").inject({}) do |hsh, tag|
    expect! tag => /\A[^:]+:[^:]+\z/
    k, v = tag.split(":", 2)
    hsh.update k => v
  end
end
print_results(query:, on_empty: nil, title: nil) click to toggle source
ps_query(tags:, limit:, queue:, only_root: false) click to toggle source
# File lib/postjob/cli/ps.rb, line 10
  def ps_query(tags:, limit:, queue:, only_root: false)
    limit = Integer(limit)

    sql = <<-SQL
      SELECT
        postjobs.id,
        full_id,
        postjobs.queue,
        workflow
          || (CASE WHEN workflow_version != '' THEN '@' ELSE '' END)
          || workflow_version
          || (CASE WHEN workflow_method != 'run' THEN '.' || workflow_method ELSE '' END)
          || args AS job,
        CASE
          WHEN (status = 'err') THEN 'err(' || failed_attempts || '/' || max_attempts || ')'
          WHEN (status = 'failed') THEN 'fail(' || failed_attempts || '/' || max_attempts || ')'
          ELSE status::varchar
          END AS status,
        error,
        COALESCE((results->0)::varchar, error_message) AS result,
        cron_interval AS cron,
        iff(is_sticky, COALESCE(substring(sticky_host_id::varchar for 9) || '...', 'yes')::varchar, 'no'::varchar) AS sticky,
        iff(is_greedy, 'yes', 'no'::varchar) AS greedy,
        next_run_at,
        -- to_char(EXTRACT(EPOCH FROM (next_run_at - now() at time zone 'utc')), '999999999.99') AS next_run_in,
        to_char(
          EXTRACT(EPOCH FROM
          (
            COALESCE(
              (SELECT MIN(created_at) FROM postjob.events WHERE postjob_id=postjobs.id AND name IN ('ok', 'failed')),
              now() at time zone 'utc'
            )
            -
            (SELECT MIN(created_at) FROM postjob.events WHERE postjob_id=postjobs.id AND name IN ('processing'))
          )
          ), '999999999.99'
        ) AS processing,

        to_char(EXTRACT(EPOCH FROM (now() at time zone 'utc' - postjobs.created_at)), '999999999.99') AS age,
        tags
      FROM postjob.postjobs AS postjobs
    SQL

    scope = Simple::SQL::Scope.new(sql)
    scope = scope.where("tags @> ?", Postjob::Queue::Encoder.encode(parse_tags(tags))) if tags
    scope = scope.where(queue: queue) if queue
    scope = scope.where("id = root_id") if only_root

    scope
      .paginate(per: limit, page: 1)
      .order_by("root_id DESC, id ASC")
  end
run_control_shutdown(host_id: nil) click to toggle source

This is called after the control connection did shut down

This method can be reimplemented in a plugin to allow shutting down worker machines.

# File lib/postjob/cli/run.rb, line 31
def run_control_shutdown(host_id: nil)
  Postjob.logger.success "postjob control:shutdown host_id: #{host_id}"
end
sessions_query(limit:) click to toggle source
# File lib/postjob/cli/sessions.rb, line 7
  def sessions_query(limit:)
    limit = Integer(limit)

    sql = <<-SQL
      SELECT
        worker_sessions.id,
        (substring(worker_sessions.host_id::varchar for 9) || '...') AS host_id,
        worker_sessions.status,
        array_to_string(worker_sessions.queues, ', ') AS queues,
        worker_sessions.client_socket,
        worker_sessions.workflows,
        worker_sessions.created_at,
        job_event.name AS event_name,
        job_event.created_at AS event_created_at,
        heartbeat.attributes AS heartbeat,
        heartbeat.created_at AS heartbeat_created_at
      FROM postjob.worker_sessions AS worker_sessions
      LEFT JOIN (
        SELECT
          worker_sessions.id,
          MAX(events.id) AS event_id
        FROM postjob.worker_sessions
        LEFT JOIN postjob.events events ON events.worker_session_id=worker_sessions.id
        WHERE events.name != 'heartbeat'
        GROUP BY worker_sessions.id
      ) last_job_event ON last_job_event.id=worker_sessions.id
      LEFT JOIN postjob.events job_event ON job_event.id=last_job_event.event_id
      LEFT JOIN (
        SELECT
          worker_sessions.id,
          MAX(events.id) AS event_id
        FROM postjob.worker_sessions
        LEFT JOIN postjob.events events ON events.worker_session_id=worker_sessions.id
        WHERE events.name = 'heartbeat'
        GROUP BY worker_sessions.id
      ) last_heartbeat ON last_heartbeat.id=worker_sessions.id
      LEFT JOIN postjob.events heartbeat ON heartbeat.id=last_heartbeat.event_id
    SQL

    scope = Simple::SQL::Scope.new(sql)
    scope = scope.where("worker_sessions.id != '00000000-0000-0000-0000-000000000000'::uuid")
    scope
      .paginate(per: limit, page: 1)
      .order_by("heartbeat_created_at DESC NULLS LAST")
  end
wait_for_no_running_session(host_id:) click to toggle source
# File lib/postjob/cli/run.rb, line 84
def wait_for_no_running_session(host_id:)
  Postjob.logger.debug "Waiting for shutdown"
  loop do
    count = Simple::SQL.ask "SELECT COUNT (*) FROM postjob.worker_sessions WHERE id=$1", host_id
    Postjob.logger.info "#{host_id}: #{count} running sessions"

    break if count == 0
    sleep 0.2
  end

  Postjob.logger.info "#{host_id}: no more running sessions"
end