CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.session_should_shutdown(p_worker_session_id UUID)
RETURNS BOOLEAN
AS $$ DECLARE
session {SCHEMA_NAME}.worker_sessions; host {SCHEMA_NAME}.hosts;
BEGIN
SELECT * INTO session FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id; SELECT * INTO host FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id; IF host.status != 'shutdown' THEN RETURN FALSE; END IF; -- If there unfinished sticky jobs on this host we do not shutdown this session. IF EXISTS ( SELECT 1 FROM postjob.postjobs WHERE status NOT IN ('ok', 'failed', 'timeout') AND sticky_host_id=host.id ) THEN RETURN FALSE; END IF; RETURN TRUE;
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.time_to_next_job(workflows_with_versions varchar[]); – removed in 0.5.0 DROP FUNCTION IF EXISTS {SCHEMA_NAME}.time_to_next_job(p_worker_session_id UUID); – removed in 0.5.7 CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.time_to_next_job(p_worker_session_id UUID, p_queue varchar[])
RETURNS float
AS $$ DECLARE
p_processable_at timestamp; session {SCHEMA_NAME}.worker_sessions; host {SCHEMA_NAME}.hosts; p_current_greedy_job {SCHEMA_NAME}.postjobs;
BEGIN
SELECT * INTO session FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id; SELECT * INTO host FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id; SELECT * INTO p_current_greedy_job FROM {SCHEMA_NAME}.postjobs WHERE status NOT IN ('ok', 'failed', 'timeout') AND id=root_id AND is_greedy LIMIT 1; SELECT MIN(processable_at) INTO p_processable_at FROM ( SELECT MIN(timing_out_at) AS processable_at FROM {SCHEMA_NAME}.postjobs WHERE status IN ('ready', 'err', 'sleep') UNION SELECT MIN(next_run_at) AS processable_at FROM {SCHEMA_NAME}.postjobs WHERE status IN ('ready', 'err') AND (p_queue IS NULL OR queue = ANY (p_queue)) AND (workflow || workflow_version) = ANY (session.workflows) AND COALESCE(sticky_host_id, {SCHEMA_NAME}._null_uuid()) IN (session.host_id, {SCHEMA_NAME}._null_uuid()) -- matches sticky_host_id for started sticky jobs and _null_uuid for -- non-started or non-sticky jobs against the host_id and _null_uuid AND ( p_current_greedy_job.id IS NULL OR root_id=p_current_greedy_job.root_id -- if there is a greedy job on this host_id which is not finished yet, -- only jobs belonging to this root jobs are allowed. ) AND ((host.status != 'shutdown') OR root_id=p_current_greedy_job.root_id) -- during shutdown only get greedy jobs. ) sq; RETURN EXTRACT(EPOCH FROM p_processable_at - (now() at time zone 'utc'));
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.checkout(workflows_with_versions varchar[], p_fast_mode BOOLEAN); – removed in 0.5.0 DROP FUNCTION IF EXISTS {SCHEMA_NAME}.checkout(p_worker_session_id UUID, p_fast_mode BOOLEAN); – removed in 0.5.8
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.checkout(
p_worker_session_id UUID, p_fast_mode BOOLEAN, p_queue varchar[]) RETURNS SETOF {SCHEMA_NAME}.postjobs
AS $$ DECLARE
job {SCHEMA_NAME}.postjobs; session {SCHEMA_NAME}.worker_sessions; host {SCHEMA_NAME}.hosts; p_current_greedy_job {SCHEMA_NAME}.postjobs;
BEGIN
SELECT * INTO session FROM {SCHEMA_NAME}.worker_sessions WHERE id=p_worker_session_id; -- -- Note on "FOR UPDATE": -- -- We don't want multiple sessions to run this function in parallel. This can lead to a situation -- where multiple greedy root jobs could be selected for different workers with identical host ids -- at the same time. We therefore lock the function here against parallel usage - and we use the -- hosts table for locking. This look will be released automatically with the current transaction, -- i.e. typically after the "SELECT * FROM checkout(..)" returns. -- SELECT * INTO host FROM {SCHEMA_NAME}.hosts WHERE id=session.host_id FOR UPDATE; SELECT * INTO p_current_greedy_job FROM {SCHEMA_NAME}.postjobs WHERE status NOT IN ('ok', 'failed', 'timeout') AND id=root_id AND sticky_host_id=session.host_id AND is_greedy LIMIT 1; LOOP -- try to checkout a job. Each of the conditions here is matching -- one of the CASE .. WHEN clauses below. SELECT INTO job * FROM {SCHEMA_NAME}.postjobs s WHERE ( s.status IN ('ready', 'err', 'sleep') AND s.timing_out_at <= (now() at time zone 'utc') ) OR ( s.status IN ('ready', 'err') AND (p_queue IS NULL OR queue = ANY (p_queue)) AND s.next_run_at <= (now() at time zone 'utc') AND (s.workflow || s.workflow_version) = ANY (session.workflows) AND COALESCE(s.sticky_host_id, {SCHEMA_NAME}._null_uuid()) IN (session.host_id, {SCHEMA_NAME}._null_uuid()) AND ( p_current_greedy_job.id IS NULL OR s.root_id=p_current_greedy_job.root_id -- if there is a greedy job on this host_id which is not finished yet, -- only jobs belonging to this root jobs are allowed. ) AND ((host.status != 'shutdown') OR root_id=p_current_greedy_job.root_id) -- during shutdown only get greedy jobs. ) ORDER BY (LEAST(s.next_run_at, s.timing_out_at)) FOR UPDATE SKIP LOCKED LIMIT 1; CASE WHEN job.id IS NULL THEN -- couldn't find a job? EXIT; WHEN job.status IN ('ready', 'err', 'sleep') AND job.timing_out_at <= (now() at time zone 'utc') THEN -- job timed out? mark it as such, and try next one. PERFORM {SCHEMA_NAME}._set_job_timeout(p_worker_session_id, job.id, p_fast_mode); CONTINUE; ELSE -- set job to processing PERFORM {SCHEMA_NAME}._set_job_processing(p_worker_session_id, job.id); RETURN QUERY SELECT * FROM {SCHEMA_NAME}.postjobs WHERE id=job.id; EXIT; END CASE; END LOOP;
END; $$ LANGUAGE plpgsql;