module Postjob::CLI
rubocop:disable Lint/HandleExceptions
rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength
rubocop:disable Lint/HandleExceptions
rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength
rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength rubocop:disable Metrics/ModuleLength rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/ParameterLists
rubocop:disable Lint/HandleExceptions
rubocop:disable Lint/HandleExceptions rubocop:disable Metrics/MethodLength
Constants
- USE_ACTIVE_RECORD
Public Instance Methods
# File lib/postjob/cli/db.rb, line 44 def confirm!(msg) STDERR.puts <<~TXT #{msg.chomp} Press return to continue, ^C to cancel... TXT STDIN.gets rescue Interrupt raise "Cancelled by user" end
Lists cron jobs in the system
# File lib/postjob/cli/cron.rb, line 5 def cron(limit: "30", queue: nil) limit = Integer(limit) queue = queue.split(",") if queue connect_to_database! query = ps_query(limit: limit, queue: queue, tags: nil) query = query.where("cron_interval IS NOT NULL") # query = query print_results query: query end
Disable an existing cron job
# File lib/postjob/cli/cron.rb, line 36 def cron_disable(workflow, *args) connect_to_database! Postjob::Queue.disable_cron_jobs(workflow, args) end
Enqueues a cron workflow
# File lib/postjob/cli/cron.rb, line 27 def cron_enqueue(workflow, *args, interval:, queue: nil, tags: nil) interval = Integer(interval) connect_to_database! Postjob.enqueue! workflow, *args, queue: queue, tags: parse_tags(tags), cron_interval: interval end
# File lib/postjob/cli/cron.rb, line 17 def cron_top(limit: "30", queue: nil) loop do system "clear" cron limit: limit, queue: queue sleep 1 end rescue Interrupt end
# File lib/postjob/cli/db.rb, line 14 def db_migrate connect_to_database! Postjob::Migrations.migrate! end
# File lib/postjob/cli/db.rb, line 31 def db_remigrate(force: false) unless force confirm! <<~TXT Really remigrating database? This will destroy all data about postjobs! (To prevent the need to confirm run with '--force'.) TXT end connect_to_database! Postjob::Migrations.unmigrate! Postjob::Migrations.migrate! end
Prints all settings in the queue
# File lib/postjob/cli/db.rb, line 6 def db_settings connect_to_database! raise "No settings in this database schema" unless ::Postjob::Queue.settings? Simple::SQL.print "SELECT name,value FROM postjob.settings ORDER BY name" end
# File lib/postjob/cli/db.rb, line 19 def db_unmigrate unless force confirm! <<~TXT Really unmigrating database? This will destroy all data about postjobs! (To prevent the need to confirm run with '--force'.) TXT end connect_to_database! Postjob::Migrations.unmigrate! end
Enqueues a workflow
Adds a workflow to the job table, with name <workflow> and the given arguments.
Note that the workflow will receive the arguments as strings and must be prepared to handle these.
# File lib/postjob/cli/enqueue.rb, line 9 def enqueue(workflow, *args, queue: nil, tags: nil, count: "1") count = Integer(count) connect_to_database! count.times do Postjob.enqueue! workflow, *args, queue: queue, tags: parse_tags(tags) end end
Show the latest job event
Example:
postjob events
# File lib/postjob/cli/events.rb, line 40 def events(limit: "30") expect! limit => /\A\d+\z/ limit = Integer(limit) connect_to_database! query = events_query(limit: limit) print_results query: query end
Show up-to-date events information once per second
# File lib/postjob/cli/events.rb, line 52 def events_top(limit: "30") loop do system "clear" events(limit: limit) sleep 1 end rescue Interrupt end
Show the latest heartbeat events
# File lib/postjob/cli/heartbeat.rb, line 34 def heartbeat(limit: "30") expect! limit => /\A\d+\z/ limit = Integer(limit) connect_to_database! query = heartbeat_query(limit: limit) Postjob.logger.info "CPU load and friends are for the last minute" print_results query: query end
Show up-to-date heartbeat information once per second
# File lib/postjob/cli/heartbeat.rb, line 47 def heartbeat_top(limit: "30") loop do system "clear" heartbeat(limit: limit) sleep 1 end rescue Interrupt end
resets the host id
# File lib/postjob/cli/hosts.rb, line 67 def host_reset Postjob::Host.clear_storage end
Set the host to running again
# File lib/postjob/cli/hosts.rb, line 79 def host_restart connect_to_database! Simple::SQL.ask "UPDATE postjob.hosts SET status='running' WHERE id=$1::uuid", ::Postjob.host_id end
Set the host to shutdown state
# File lib/postjob/cli/hosts.rb, line 72 def host_shutdown connect_to_database! Postjob::Host.shutdown!(host_id: ::Postjob.host_id) end
Show hosts status
This command lists all worker_sessions currently in the system.
Example:
postjob hosts
# File lib/postjob/cli/hosts.rb, line 45 def hosts(limit: "30") expect! limit => /\A\d+\z/ limit = Integer(limit) connect_to_database! query = hosts_query(limit: limit) print_results query: query end
Show up-to-date hosts information once per second
# File lib/postjob/cli/hosts.rb, line 57 def hosts_top(limit: "30") loop do system "clear" hosts(limit: limit) sleep 1 end rescue Interrupt end
# File lib/postjob/cli/hosts.rb, line 85 def hosts_zombies Simple::SQL.print <<~SQL SELECT hosts.* FROM postjob.hosts hosts LEFT JOIN ( SELECT id, host_id FROM postjob.events events WHERE name='heartbeat' AND created_at > now() at time zone 'utc' - interval '5 minutes' ) heartbeats ON hosts.id=heartbeats.host_id WHERE status IN ('running', 'shutdown') AND heartbeats.id IS NULL SQL end
# File lib/postjob/cli/job.rb, line 8 def job_enqueue(workflow, *args, queue: nil, tags: nil) logger "The job:enqueue command is deprecated, pls use enqueue instead." enqueue(workflow, *args, queue: queue, tags: tags) end
runs a job as soon as possible
# File lib/postjob/cli/job.rb, line 62 def job_force(job_id) connect_to_database! job_id = Integer(job_id) Simple::SQL.ask <<~SQL, job_id UPDATE postjob.postjobs SET next_run_at=now() at time zone 'utc' WHERE id = $1 AND status IN ('ready', 'err', 'sleep') SQL end
Kills a specific job
# File lib/postjob/cli/job.rb, line 45 def job_kill(job_id) connect_to_database! job_id = Integer(job_id) Simple::SQL.ask <<~SQL, job_id UPDATE postjob.postjobs SET status='failed', next_run_at=null, error='Manually terminated', error_message='Manually terminated' WHERE id = $1 AND status IN ('ready', 'err', 'sleep') SQL end
Reset failed jobs
This resets all failed jobs within the job tree, below the passed in job id.
# File lib/postjob/cli/job.rb, line 17 def job_reset(job_id) connect_to_database! job_id = Integer(job_id) full_job_id = Simple::SQL.ask "SELECT full_id FROM postjob.postjobs WHERE id=$1", job_id full_job_id || logger.error("No such job: #{job_id}") job_ids = Simple::SQL.all <<~SQL SELECT id FROM postjob.postjobs WHERE (full_id LIKE '#{full_job_id}.%' OR full_id='#{full_job_id}') AND status IN ('failed', 'err', 'timeout') SQL logger.warn "Affected jobs: #{job_ids.count}" return if job_ids.empty? Simple::SQL.ask <<~SQL, job_ids UPDATE postjob.postjobs SET status='ready', next_run_at=(now() at time zone 'utc'), results=null, failed_attempts=0, error=NULL, error_message=NULL, error_backtrace=NULL WHERE id = ANY($1); SQL logger.warn "The following jobs have been reset: #{job_ids.join(', ')}" end
Resolve a job by token
# File lib/postjob/cli/job.rb, line 3 def job_resolve(token, result) connect_to_database! Postjob.resolve(token: token, result: result) end
Show job status
This command lists the statuses of all jobs that are either root jobs, i.e. enqueued workflows, or that have failed.
Example:
postjob ps --tags=foo:bar,bar:baz --limit=100
For a listing of all jobs in the system use ps:full, see 'postjob help ps:full' for details.
# File lib/postjob/cli/ps.rb, line 76 def ps(*ids, limit: "30", tags: nil, queue: nil, only_root: nil) expect! limit => /\A\d+\z/ limit = Integer(limit) queue = queue.split(",") if queue connect_to_database! # check for timed out and zombie processes # ::Postjob::Queue.checkout(nil) unless ids.empty? ps_full(*ids, limit: limit, tags: tags) return end query = ps_query(tags: tags, limit: limit, queue: queue, only_root: only_root) query = query.where("root_id=id OR status NOT IN ('sleep', 'ok') OR failed_attempts > 0") print_results query: query end
# File lib/postjob/cli/ps.rb, line 97 def ps_full(*ids, limit: "30", tags: nil, queue: nil) queue = queue.split(",") if queue connect_to_database! query = ps_query(tags: tags, limit: limit, queue: queue) unless ids.empty? parsed_ids = parse_ids(*ids) query = query.where("root_id IN (#{parsed_ids.join(',')})") end print_results query: query, on_empty: "Note that ps_full requires the **root ids**" end
Print the result of a job
# File lib/postjob/cli/ps.rb, line 127 def ps_result(id) connect_to_database! scope = Simple::SQL::Scope.new("SELECT * FROM postjob.postjobs") scope = scope.where("postjobs.id = ANY(?)", parse_ids(id)) job = Simple::SQL.ask(scope, into: Postjob::Job) raise "Job ##{id}: no such job" unless job if job.status == "timeout" || job.status == "failed" # resolve raises exceptions on failing jobs. At this point we'll just tell # the user to be aware of that, and let job.resolve fail below. logger.warn "Job failed. Details below." end result = job.resolve # Pending jobs don't have a result yet. Just saying.. raise "Job ##{id}: job is still pending" if result == :pending # Print result. If this is a structure we pretty print, if not we # print verbatim. if result.is_a?(String) || !result.respond_to?(:empty?) puts result else puts JSON.pretty_generate(result) end end
Show all information about this job
# File lib/postjob/cli/ps.rb, line 157 def ps_show(id, *ids) connect_to_database! scope = Simple::SQL::Scope.new <<~SQL SELECT postjobs.*, tokens.token FROM postjob.postjobs AS postjobs LEFT JOIN postjob.tokens AS tokens ON tokens.postjob_id=postjobs.id SQL scope = scope.where("postjobs.id = ANY(?)", parse_ids(id, *ids)) Simple::SQL.all(scope, into: Postjob::Job) do |job| pp job end end
Show up-to-date information once per second
# File lib/postjob/cli/ps.rb, line 113 def ps_top(*ids, limit: "30", tags: nil, full: false, queue: nil, only_root: false) loop do system "clear" if full ps_full(*ids, limit: limit, tags: tags, queue: queue) else ps(*ids, limit: limit, tags: tags, queue: queue, only_root: only_root) end sleep 1 end rescue Interrupt end
Show queue status
# File lib/postjob/cli/queues.rb, line 5 def queues connect_to_database! Simple::SQL.print <<~SQL WITH queues AS ( SELECT DISTINCT(queue) AS queue FROM postjob.postjobs ), jobs AS ( SELECT queue, SUM(IFF(status IN ('ready', 'sleep'), 1, 0)) AS waiting, SUM(IFF(status IN ('processing'), 1, 0)) AS processing, SUM(IFF(status='ok', 1, 0)) AS succeeded, SUM(IFF(status IN ('failed', 'timeout'), 1, 0)) AS failed FROM postjob.postjobs GROUP BY queue ), workflows AS ( SELECT queue, SUM(IFF(status IN ('ready', 'sleep'), 1, 0)) AS waiting, SUM(IFF(status IN ('processing'), 1, 0)) AS processing, SUM(IFF(status='ok', 1, 0)) AS succeeded, SUM(IFF(status IN ('failed', 'timeout'), 1, 0)) AS failed FROM postjob.postjobs WHERE id=root_id GROUP BY queue ), workers AS ( SELECT sq.queue, COUNT(*) AS count FROM ( SELECT hosts.id, UNNEST(sessions.queues) AS queue, hosts.status, heartbeats.created_at AS latest_heartbeat FROM postjob.hosts hosts INNER JOIN postjob.worker_sessions sessions ON sessions.host_id=hosts.id AND sessions.status = 'running' LEFT JOIN ( SELECT host_id, MAX(created_at) AS created_at FROM postjob.events events WHERE name='heartbeat' AND created_at > now() at time zone 'utc' - interval '5 minutes' GROUP BY host_id ) heartbeats ON hosts.id=heartbeats.host_id WHERE hosts.status in ('running', 'shutdown') AND heartbeats.created_at IS NOT NULL ) sq GROUP BY queue ) SELECT queues.queue, COALESCE(workers.count, 0) AS "workers", COALESCE(jobs.waiting, 0) AS "jobs waiting", COALESCE(jobs.processing, 0) AS "jobs processing", COALESCE(jobs.succeeded, 0) AS "jobs succeeded", COALESCE(jobs.failed, 0) AS "jobs failed", COALESCE(workflows.waiting, 0) AS "workflows waiting", COALESCE(workflows.processing, 0) AS "workflows processing", COALESCE(workflows.succeeded, 0) AS "workflows succeeded", COALESCE(workflows.failed, 0) AS "workflows failed" FROM queues LEFT JOIN workers USING(queue) LEFT JOIN jobs USING(queue) LEFT JOIN workflows USING(queue) ORDER BY queue SQL end
Show up-to-date queue information once per second
# File lib/postjob/cli/queues.rb, line 77 def queues_top loop do system "clear" queues sleep 1 end rescue Interrupt end
# File lib/postjob/cli/registry.rb, line 2 def registry puts "=== Queues ==========================================================" puts Postjob::Registry.queues.join(", ") puts "=== Workflows =======================================================" require "table_print" workflows_with_versions = Postjob::Registry.workflows.keys.reject { |k| k[1] == "" } workflows_with_versions = workflows_with_versions.sort_by { |name, _version| name } data = workflows_with_versions.map do |name, version| spec = Postjob::Registry.lookup! name: name, version: version { name: name, options: spec.options.inspect } end tp data end
Run postjobs.
This method runs jobs as they become ready.
Parameters:
-
–count=<count> maximum number of jobs to process. Default: unlimited.
-
–queue=queue1,queue2,queue3 run only the specified queues.
-
–heartbeat=no don't start heartbeat process.
# File lib/postjob/cli/run.rb, line 47 def run(count: nil, queue: nil, fast: false, host_id: nil) count = Integer(count) if count processed = _run(count: count, queue: queue, fast: fast, host_id: host_id, heartbeat: false) if !count || processed < count # The runner has been shut down externally. Wait for interrupt. Postjob.logger.success "External shut down initiated." STDERR.puts "External shut down initiated. Press ^C to terminate process." STDIN.read end end
# File lib/postjob/cli.rb, line 13 def run!(command, *args) Postjob.logger = logger load_environment! super end
Start the control connection
The control connection runs the heartbeat, and checks for jobs on the control queue.
# File lib/postjob/cli/run.rb, line 11 def run_control(host_id: nil) # By running the "control" queue this code is quite efficient, it does # not poll the database. _run(queue: "control", host_id: host_id, heartbeat: true) host_id ||= Postjob.host_id # Poll until there are no more working sessions. # [TODO] - improve by listening. wait_for_no_running_session(host_id: host_id) run_control_shutdown(host_id: host_id) end
Show sessions status
This command lists all worker sessions currently in the system.
Example:
postjob sessions
# File lib/postjob/cli/sessions.rb, line 62 def sessions(limit: "30") expect! limit => /\A\d+\z/ limit = Integer(limit) connect_to_database! # check for timed out and zombie processes # ::Postjob::Queue.checkout(nil) query = sessions_query(limit: limit) print_results query: query end
Show up-to-date session information once per second
# File lib/postjob/cli/sessions.rb, line 77 def sessions_top(limit: "30") loop do system "clear" sessions(limit: limit) sleep 1 end rescue Interrupt end
Run a single job
# File lib/postjob/cli/run.rb, line 3 def step(count: 1, queue: nil, host_id: nil) run count: count, queue: queue, host_id: host_id, heartbeat: false end
Prints version info
# File lib/postjob/cli/version.rb, line 3 def version puts "This is postjob (ruby) #{Postjob::VERSION}" begin connect_to_database! puts "postjob/schema_version: #{::Postjob::Queue.version}" rescue StandardError Postjob.logger.warn "Cannot read postjob schema version. Database might not be configured or migrated." end end
Private Instance Methods
# File lib/postjob/cli/run.rb, line 61 def _run(count: nil, queue:, fast: false, host_id:, heartbeat:) expect! Integer(host_id, 16) => 1..0xffffffff if host_id expect! heartbeat => [ true, false ] Postjob.fast_mode = (fast ? true : false) connect_to_database! if host_id Postjob::Host.host_id = "%08x-0000-0000-0000-000000000000" % Integer(host_id, 16) Postjob.logger.info "Using host_id: #{Postjob::Host.host_id}" end processed = Postjob.run(count: count, queues: queue&.split(","), heartbeat: heartbeat) do |job_id| logger.info "Processed job w/id #{job_id}" if job_id end logger.info "Processed #{processed} jobs" processed end
# File lib/postjob/cli/db.rb, line 59 def connect_to_database! return if @connected_to_database @connected_to_database = true if USE_ACTIVE_RECORD require "active_record" abc = ::Simple::SQL::Config.read_database_yml ::ActiveRecord::Base.establish_connection abc else ::Simple::SQL.connect! end end
# File lib/postjob/cli/events.rb, line 7 def events_query(limit:) limit = Integer(limit) sql = <<-SQL SELECT events.id, events.name, events.postjob_id AS job_id, postjobs.workflow || (CASE WHEN postjobs.workflow_version != '' THEN '@' ELSE '' END) || postjobs.workflow_version || (CASE WHEN postjobs.workflow_method != 'run' THEN '.' || postjobs.workflow_method ELSE '' END) || postjobs.args AS job, worker_session_id, events.created_at FROM postjob.events events LEFT JOIN postjob.postjobs postjobs ON events.postjob_id=postjobs.id WHERE events.name != 'heartbeat' SQL scope = Simple::SQL::Scope.new(sql) scope .order_by("events.id DESC") .paginate(per: limit, page: 1) end
# File lib/postjob/cli/heartbeat.rb, line 6 def heartbeat_query(limit:) limit = Integer(limit) sql = <<-SQL SELECT name, postjob_id AS job_id, events.host_id, (events.attributes->>'uptime')::interval AS uptime, to_char((events.attributes->>'cpu_load_1min')::float, '99D99') AS cpu_load, events.attributes->>'net_in_1min' AS net_in, events.attributes->>'net_out_1min' AS net_out, events.attributes->>'net_errors_1min' AS net_errors, now() at time zone 'utc' - events.created_at AS age FROM postjob.events events LEFT JOIN postjob.worker_sessions worker_sessions ON events.worker_session_id=worker_sessions.id WHERE events.name = 'heartbeat' SQL scope = Simple::SQL::Scope.new(sql) scope .order_by("events.id DESC") .paginate(per: limit, page: 1) end
# File lib/postjob/cli/hosts.rb, line 7 def hosts_query(limit:) limit = Integer(limit) sql = <<-SQL SELECT hosts.id, hosts.created_at, hosts.status, heartbeats.attributes AS heartbeat, heartbeats.created_at AS heartbeat_created_at, hosts.attributes FROM postjob.hosts hosts LEFT JOIN ( SELECT host_id, attributes, created_at, rank() OVER (PARTITION BY host_id ORDER BY created_at DESC) AS rank FROM postjob.events events WHERE name='heartbeat' ) heartbeats ON heartbeats.host_id=hosts.id AND rank = 1 SQL scope = Simple::SQL::Scope.new(sql) scope .order_by("hosts.created_at DESC NULLS LAST") .paginate(per: limit, page: 1) end
# File lib/postjob/cli.rb, line 22 def load_environment(path) return unless File.exist?(path) logger.debug "#{path}: loading Postjob configuration" load path end
# File lib/postjob/cli.rb, line 29 def load_environment! load_environment("config/environment.rb") load_environment("config/postjob.rb") end
# File lib/postjob/cli/helpers.rb, line 6 def parse_ids(*ids) ids.map do |s| s = s.gsub(/.*\./, "") Integer(s) end.uniq end
# File lib/postjob/cli/helpers.rb, line 13 def print_results(query:, on_empty: nil, title: nil) if title puts "\n=== #{title} =======================================\n\n" end records = Simple::SQL.print(query) if records.total_count && records.total_count > records.length logger.warn "Output limited up to limit #{records.length}. Use the --limit=<NN> command line option for a different limit." end if records.empty? && on_empty logger.warn(on_empty) end end
# File lib/postjob/cli/ps.rb, line 10 def ps_query(tags:, limit:, queue:, only_root: false) limit = Integer(limit) sql = <<-SQL SELECT postjobs.id, full_id, postjobs.queue, workflow || (CASE WHEN workflow_version != '' THEN '@' ELSE '' END) || workflow_version || (CASE WHEN workflow_method != 'run' THEN '.' || workflow_method ELSE '' END) || args AS job, CASE WHEN (status = 'err') THEN 'err(' || failed_attempts || '/' || max_attempts || ')' WHEN (status = 'failed') THEN 'fail(' || failed_attempts || '/' || max_attempts || ')' ELSE status::varchar END AS status, error, COALESCE((results->0)::varchar, error_message) AS result, cron_interval AS cron, iff(is_sticky, COALESCE(substring(sticky_host_id::varchar for 9) || '...', 'yes')::varchar, 'no'::varchar) AS sticky, iff(is_greedy, 'yes', 'no'::varchar) AS greedy, next_run_at, -- to_char(EXTRACT(EPOCH FROM (next_run_at - now() at time zone 'utc')), '999999999.99') AS next_run_in, to_char( EXTRACT(EPOCH FROM ( COALESCE( (SELECT MIN(created_at) FROM postjob.events WHERE postjob_id=postjobs.id AND name IN ('ok', 'failed')), now() at time zone 'utc' ) - (SELECT MIN(created_at) FROM postjob.events WHERE postjob_id=postjobs.id AND name IN ('processing')) ) ), '999999999.99' ) AS processing, to_char(EXTRACT(EPOCH FROM (now() at time zone 'utc' - postjobs.created_at)), '999999999.99') AS age, tags FROM postjob.postjobs AS postjobs SQL scope = Simple::SQL::Scope.new(sql) scope = scope.where("tags @> ?", Postjob::Queue::Encoder.encode(parse_tags(tags))) if tags scope = scope.where(queue: queue) if queue scope = scope.where("id = root_id") if only_root scope .paginate(per: limit, page: 1) .order_by("root_id DESC, id ASC") end
This is called after the control connection did shut down
This method can be reimplemented in a plugin to allow shutting down worker machines.
# File lib/postjob/cli/run.rb, line 31 def run_control_shutdown(host_id: nil) Postjob.logger.success "postjob control:shutdown host_id: #{host_id}" end
# File lib/postjob/cli/sessions.rb, line 7 def sessions_query(limit:) limit = Integer(limit) sql = <<-SQL SELECT worker_sessions.id, (substring(worker_sessions.host_id::varchar for 9) || '...') AS host_id, worker_sessions.status, array_to_string(worker_sessions.queues, ', ') AS queues, worker_sessions.client_socket, worker_sessions.workflows, worker_sessions.created_at, job_event.name AS event_name, job_event.created_at AS event_created_at, heartbeat.attributes AS heartbeat, heartbeat.created_at AS heartbeat_created_at FROM postjob.worker_sessions AS worker_sessions LEFT JOIN ( SELECT worker_sessions.id, MAX(events.id) AS event_id FROM postjob.worker_sessions LEFT JOIN postjob.events events ON events.worker_session_id=worker_sessions.id WHERE events.name != 'heartbeat' GROUP BY worker_sessions.id ) last_job_event ON last_job_event.id=worker_sessions.id LEFT JOIN postjob.events job_event ON job_event.id=last_job_event.event_id LEFT JOIN ( SELECT worker_sessions.id, MAX(events.id) AS event_id FROM postjob.worker_sessions LEFT JOIN postjob.events events ON events.worker_session_id=worker_sessions.id WHERE events.name = 'heartbeat' GROUP BY worker_sessions.id ) last_heartbeat ON last_heartbeat.id=worker_sessions.id LEFT JOIN postjob.events heartbeat ON heartbeat.id=last_heartbeat.event_id SQL scope = Simple::SQL::Scope.new(sql) scope = scope.where("worker_sessions.id != '00000000-0000-0000-0000-000000000000'::uuid") scope .paginate(per: limit, page: 1) .order_by("heartbeat_created_at DESC NULLS LAST") end
# File lib/postjob/cli/run.rb, line 84 def wait_for_no_running_session(host_id:) Postjob.logger.debug "Waiting for shutdown" loop do count = Simple::SQL.ask "SELECT COUNT (*) FROM postjob.worker_sessions WHERE id=$1", host_id Postjob.logger.info "#{host_id}: #{count} running sessions" break if count == 0 sleep 0.2 end Postjob.logger.info "#{host_id}: no more running sessions" end