module Postjob
rubocop:disable Security/Eval
Constants
- CRON_INTERVAL_MIN
- DEFAULT_QUEUE
- VERSION
Attributes
Public Instance Methods
# File lib/postjob.rb, line 263 def current_session? @worker_session != @nil end
This method connects to the queue. This means it registers as a new worker_session, if there was no worker_session yet.
# File lib/postjob.rb, line 269 def current_session_id raise("worker_session hasn't been started yet.") unless @worker_session @worker_session.id end
Enqueues a workflow.
Options include
-
version
-
max_attempts
-
timeout
-
sticky
-
cron_interval
-
queue
Returns a job id
# File lib/postjob.rb, line 62 def enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) expect! queue => [nil, String] expect! workflow => String expect! parent_id => [nil, Integer] expect! max_attempts => [nil, Integer] expect! timeout => [nil, Numeric] expect! tags => [nil, Hash] expect! sticky => [nil, false, true] # -- prepare arguments ---------------------------------------------------- if workflow == "__manual__" && max_attempts != 1 if parent_id Postjob.logger.info "Job ##{parent_id} adjusting max_attempts of '__manual__' child job" else Postjob.logger.info "Adjusting max_attempts of '__manual__' root job" end max_attempts = 1 end # -- fetch defaults from registry ----------------------------------------- spec = Postjob::Registry[workflow] if spec max_attempts = spec.options.max_attempts if max_attempts.nil? timeout = spec.options.timeout if timeout.nil? sticky = spec.options.sticky if sticky.nil? cron_interval = spec.options.cron_interval if cron_interval.nil? queue = spec.options.queue if queue.nil? greedy = spec.options.greedy end if cron_interval && cron_interval < CRON_INTERVAL_MIN raise "cron interval must be at least #{CRON_INTERVAL_MIN} seconds" end # -- disable existing cron jobs ------------------------------------------- if cron_interval Queue.disable_cron_jobs(workflow, args) end # -- enqueue jobs --------------------------------------------------------- tags = stringify_hash(tags) if tags session_id = current_session_id if current_session? session_id ||= "00000000-0000-0000-0000-000000000000" job = Queue.enqueue_job session_id, workflow, *args, queue: queue, parent_id: parent_id, max_attempts: max_attempts, timeout: timeout, tags: tags, version: version, cron_interval: cron_interval, sticky: sticky, greedy: greedy logger.info "Generated process #{job}" job.id end
# File lib/postjob.rb, line 31 def env ENV["POSTJOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development" end
# File lib/postjob.rb, line 35 def host_id Host.host_id end
process all waiting jobs.
This method starts processing jobs, as long as there are some. It returns once no runnable jobs can be found anymore.
Note that this method is not limited to the set of runnable jobs present when calling it; if running a job results in newly created runnable jobs these jobs will be processed as well.
This method returns the number of processed jobs.
# File lib/postjob.rb, line 154 def process_all run do |job| !job.nil? end end
Registers a workflow.
This call registers a workflow with a set of options.
This is usually used via
module X Postjob.register self, max_attempts: 2, timeout: 86400 end
The workflow
parameter is either a module which implements a workflow, or a String with the name of a workflow, and a set of options.
Options include
-
version
-
max_attempts
-
timeout
-
sticky
-
cron_interval
-
queue
# File lib/postjob.rb, line 307 def register_workflow(workflow, options = {}) expect! workflow => [ Module, String ] workflow.extend Postjob::Workflow if workflow.is_a?(Module) Registry.register workflow, options end
Explicitely resolve a workflow.
# File lib/postjob.rb, line 276 def resolve(token:, result:) job = Queue.find_job_by_token(token) raise "No job with token #{token}" unless job Queue.set_job_result current_session_id, job, result, version: nil end
processes many jobs.
This method starts processing jobs, as long as there are some. If no jobs can be found this method waits until a job becomes available.
After processing each job is yielded into the passed in block.
This method continues until: a) the requested number of jobs (via the count: argument) was processed (note:
repeated job executions due to rerunning jobs that slept or errored count multiple times), or
b) the block yielded into returns false.
This method returns the number of processed jobs.
# File lib/postjob.rb, line 174 def run(count: nil, queues: nil, heartbeat: true, &block) queues ||= Postjob::Registry.queues # to run 10^12 jobs that would take 1 msecs each we would need, at least, # 760 years - so this default should be fine. Also, someone should update # the machine in the meantime :) count ||= 1_000_000_000_000 with_worker_session heartbeat: heartbeat, queues: queues do processed_jobs_count = 0 loop do processed_job_id, shutdown = Postjob.step(queues: queues) processed_jobs_count += 1 if processed_job_id break if processed_jobs_count >= count break if block && yield(processed_job_id) == false break if shutdown == :shutdown next if processed_job_id shutdown = Queue::Notifications.wait_for_new_job(current_session_id, queues: queues) break if shutdown == :shutdown end processed_jobs_count end end
Runs a single job
This method tries to check out a runnable job. If it finds one the job is processed (via Postjob.process_job).
This method returns a tuple [ <job>, <shutdown> ], where
-
<job-id> is the id of the job which has been processed;
-
<shutdown> is a flag, either :shutdown or nil. :shutdown notifies self.run to terminate the run loop.
or nil, when no job could be checked out.
# File lib/postjob.rb, line 254 def step(queues: nil) expect! queues => [Array, nil] queues ||= Postjob::Registry.queues job = Postjob::Queue.checkout(current_session_id, queues: queues) [ job.id, process_job(job) ] if job end
# File lib/postjob.rb, line 235 def stop_worker_session! return unless @worker_session WorkerSession.stop!(@worker_session) @worker_session = nil end
Private Instance Methods
# File lib/postjob.rb, line 346 def enqueue_cleanup(job) spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version) return unless spec.supports_cleanup? Postjob.logger.info "Enqueuing cleanup job #{spec.name}.cleanup(#{job.args.map(&:inspect).join(", ")})" Simple::SQL.transaction do job_id = enqueue! "#{job.workflow}.cleanup", *job.args, queue: job.queue, parent_id: nil, max_attempts: nil, timeout: nil, version: job.workflow_version, tags: job.tags, cron_interval: nil, sticky: job.is_sticky if job.is_sticky Simple::SQL.ask "UPDATE postjob.postjobs SET sticky_host_id=$1 WHERE id=$2", job.sticky_host_id, job_id end end end
# File lib/postjob.rb, line 135 def stringify_hash(hsh) hsh.inject({}) do |r, (k, v)| k = k.to_s if k.is_a?(Symbol) r.update k => v end end