CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.session_should_shutdown(p_worker_session_id UUID)

RETURNS BOOLEAN

AS $$ DECLARE

session {SCHEMA_NAME}.worker_sessions;
host {SCHEMA_NAME}.hosts;

BEGIN

SELECT * INTO session
FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id;

SELECT * INTO host
FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id;

IF host.status != 'shutdown' THEN
  RETURN FALSE;
END IF;

-- If there unfinished sticky jobs on this host we do not shutdown this session.

IF EXISTS (
  SELECT 1 FROM postjob.postjobs WHERE
    status NOT IN ('ok', 'failed', 'timeout')
    AND sticky_host_id=host.id
) THEN
  RETURN FALSE;
END IF;

RETURN TRUE;

END; $$ LANGUAGE plpgsql;

DROP FUNCTION IF EXISTS {SCHEMA_NAME}.time_to_next_job(workflows_with_versions varchar[]); – removed in 0.5.0 DROP FUNCTION IF EXISTS {SCHEMA_NAME}.time_to_next_job(p_worker_session_id UUID); – removed in 0.5.7 CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.time_to_next_job(p_worker_session_id UUID, p_queue varchar[])

RETURNS float

AS $$ DECLARE

p_processable_at timestamp;
session {SCHEMA_NAME}.worker_sessions;
host {SCHEMA_NAME}.hosts;
p_current_greedy_job {SCHEMA_NAME}.postjobs;

BEGIN

SELECT * INTO session
FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id;

SELECT * INTO host
FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id;

SELECT * INTO p_current_greedy_job
FROM {SCHEMA_NAME}.postjobs WHERE
  status NOT IN ('ok', 'failed', 'timeout')
  AND id=root_id
  AND is_greedy
LIMIT 1;

SELECT MIN(processable_at) INTO p_processable_at FROM (
    SELECT MIN(timing_out_at) AS processable_at
    FROM {SCHEMA_NAME}.postjobs
    WHERE status IN ('ready', 'err', 'sleep')
  UNION
    SELECT MIN(next_run_at) AS processable_at
    FROM {SCHEMA_NAME}.postjobs
    WHERE status IN ('ready', 'err')
      AND (p_queue IS NULL OR queue = ANY (p_queue))
      AND (workflow || workflow_version) = ANY (session.workflows)
      AND COALESCE(sticky_host_id, {SCHEMA_NAME}._null_uuid()) IN (session.host_id, {SCHEMA_NAME}._null_uuid()) -- matches sticky_host_id for started sticky jobs and _null_uuid for 
                                                                                                                -- non-started or non-sticky jobs against the host_id and _null_uuid
      AND (
        p_current_greedy_job.id IS NULL OR root_id=p_current_greedy_job.root_id                                 -- if there is a greedy job on this host_id which is not finished yet,
                                                                                                                -- only jobs belonging to this root jobs are allowed.
      )
      AND ((host.status != 'shutdown') OR root_id=p_current_greedy_job.root_id)                                 -- during shutdown only get greedy jobs.
) sq;

RETURN EXTRACT(EPOCH FROM p_processable_at - (now() at time zone 'utc'));

END; $$ LANGUAGE plpgsql;

DROP FUNCTION IF EXISTS {SCHEMA_NAME}.checkout(workflows_with_versions varchar[], p_fast_mode BOOLEAN); – removed in 0.5.0 DROP FUNCTION IF EXISTS {SCHEMA_NAME}.checkout(p_worker_session_id UUID, p_fast_mode BOOLEAN); – removed in 0.5.8

CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.checkout(

  p_worker_session_id UUID,
  p_fast_mode BOOLEAN,
  p_queue varchar[])
RETURNS SETOF {SCHEMA_NAME}.postjobs

AS $$ DECLARE

job {SCHEMA_NAME}.postjobs;
session {SCHEMA_NAME}.worker_sessions;
host {SCHEMA_NAME}.hosts;
p_current_greedy_job {SCHEMA_NAME}.postjobs;

BEGIN

SELECT * INTO session
FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id;

--
-- Note on "FOR UPDATE":
--
-- We don't want multiple sessions to run this function in parallel. This can lead to a situation
-- where multiple greedy root jobs could be selected for different workers with identical host ids
-- at the same time. We therefore lock the function here against parallel usage - and we use the
-- hosts table for locking. This look will be released automatically with the current transaction,
-- i.e. typically after the "SELECT * FROM checkout(..)" returns.
--
SELECT * INTO host
FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id FOR UPDATE;

SELECT * INTO p_current_greedy_job
FROM {SCHEMA_NAME}.postjobs WHERE
  status NOT IN ('ok', 'failed', 'timeout')
  AND id=root_id
  AND sticky_host_id=session.host_id
  AND is_greedy
LIMIT 1;

LOOP
  -- try to checkout a job. Each of the conditions here is matching
  -- one of the CASE .. WHEN clauses below.
  SELECT INTO job *
  FROM {SCHEMA_NAME}.postjobs s
  WHERE
    (
      s.status IN ('ready', 'err', 'sleep')
      AND s.timing_out_at <= (now() at time zone 'utc')
    )
    OR
    (
      s.status IN ('ready', 'err')
      AND (p_queue IS NULL OR queue = ANY (p_queue))
      AND s.next_run_at <= (now() at time zone 'utc')
      AND (s.workflow || s.workflow_version) = ANY (session.workflows)
      AND COALESCE(s.sticky_host_id, {SCHEMA_NAME}._null_uuid()) IN (session.host_id, {SCHEMA_NAME}._null_uuid())
      AND (
        p_current_greedy_job.id IS NULL OR s.root_id=p_current_greedy_job.root_id                               -- if there is a greedy job on this host_id which is not finished yet,
                                                                                                                -- only jobs belonging to this root jobs are allowed.
      )
      AND ((host.status != 'shutdown') OR root_id=p_current_greedy_job.root_id)                                 -- during shutdown only get greedy jobs.
    )
  ORDER BY (LEAST(s.next_run_at, s.timing_out_at))
  FOR UPDATE SKIP LOCKED
  LIMIT 1;

  CASE
    WHEN job.id IS NULL THEN
      -- couldn't find a job?
      EXIT;
    WHEN job.status IN ('ready', 'err', 'sleep') AND job.timing_out_at <= (now() at time zone 'utc') THEN
      -- job timed out? mark it as such, and try next one.
      PERFORM {SCHEMA_NAME}._set_job_timeout(p_worker_session_id, job.id, p_fast_mode);
      CONTINUE;
    ELSE
      -- set job to processing
      PERFORM {SCHEMA_NAME}._set_job_processing(p_worker_session_id, job.id);
      RETURN QUERY SELECT * FROM {SCHEMA_NAME}.postjobs WHERE id=job.id;
      EXIT;
  END CASE;
END LOOP;

END; $$ LANGUAGE plpgsql;