CREATE OR REPLACE FUNCTION {SCHEMA_NAME}._update_job_version(job_id BIGINT, p_version VARCHAR) RETURNS VOID AS $$ BEGIN
IF p_version IS NOT NULL THEN UPDATE {SCHEMA_NAME}.postjobs SET workflow_version=p_version, updated_at=(now() at time zone 'utc') WHERE id=job_id; END IF;
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.set_job_result(job_id BIGINT, p_results JSONB, p_version VARCHAR); – removed in 0.5.0 CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.set_job_result(p_worker_session_id UUID, job_id BIGINT, p_results JSONB, p_version VARCHAR) RETURNS VOID AS $$ BEGIN
PERFORM {SCHEMA_NAME}._update_job_version(job_id, p_version); PERFORM {SCHEMA_NAME}._reset_job_processing(p_worker_session_id, job_id); UPDATE {SCHEMA_NAME}.postjobs SET results=p_results, error=NULL, error_message=NULL, error_backtrace=NULL, status='ok', next_run_at=NULL WHERE id=job_id; PERFORM {SCHEMA_NAME}._wakeup_parent_job(p_worker_session_id, job_id);
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.set_job_pending(job_id BIGINT, p_version VARCHAR); – removed in 0.5.0 CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.set_job_pending(p_worker_session_id UUID, job_id BIGINT, p_version VARCHAR) RETURNS VOID AS $$ BEGIN
PERFORM {SCHEMA_NAME}._update_job_version(job_id, p_version); PERFORM {SCHEMA_NAME}._reset_job_processing(p_worker_session_id, job_id); UPDATE {SCHEMA_NAME}.postjobs SET last_worker_session_id=p_worker_session_id, status='sleep', next_run_at=NULL WHERE id=job_id;
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}._prepare_next_run(BIGINT, {SCHEMA_NAME}.statuses, BOOLEAN);
– If this is a recoverable error and if we have another run possible – we'll set next_run_at, and the status to “err”, otherwise – next_run_at will be NULL and the status would be “failed” or “timeout” CREATE OR REPLACE FUNCTION {SCHEMA_NAME}._prepare_rerun(
job_id BIGINT, p_status {SCHEMA_NAME}.statuses, p_fast_mode BOOLEAN) RETURNS VOID AS $$
DECLARE
next_run_at_basetime DOUBLE PRECISION;
BEGIN
IF p_status NOT IN ('err', 'timeout', 'failed') THEN RAISE 'Invalid status value %', p_status; END IF; -- If this is a recoverable error we check if we have any remaining attempts. IF p_status = 'err' THEN IF (SELECT max_attempts - failed_attempts FROM {SCHEMA_NAME}.postjobs WHERE id=job_id) > 0 THEN p_status = 'err'; ELSE p_status = 'failed'; END IF; END IF; -- set status, clear next_run_at UPDATE {SCHEMA_NAME}.postjobs SET status=p_status, next_run_at=NULL WHERE id=job_id; -- fill in next_run_at IF p_status != 'failed' AND p_status != 'timeout' THEN next_run_at_basetime := CASE WHEN p_fast_mode THEN 0.01 ELSE 10 END; UPDATE {SCHEMA_NAME}.postjobs SET next_run_at=(now() at time zone 'utc') + next_run_at_basetime * pow(1.5, failed_attempts) * interval '1 second' WHERE id=job_id; END IF;
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.set_job_error(
job_id BIGINT, p_error VARCHAR, p_error_message VARCHAR, p_error_backtrace JSONB, p_status {SCHEMA_NAME}.statuses, p_version VARCHAR, p_fast_mode BOOLEAN); -- removed in 0.5.0
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.set_job_error(
p_worker_session_id UUID, job_id BIGINT, p_error VARCHAR, p_error_message VARCHAR, p_error_backtrace JSONB, p_status {SCHEMA_NAME}.statuses, p_version VARCHAR, p_fast_mode BOOLEAN) RETURNS VOID AS $$
BEGIN
PERFORM {SCHEMA_NAME}._update_job_version(job_id, p_version); PERFORM {SCHEMA_NAME}._reset_job_processing(p_worker_session_id, job_id); -- write error info UPDATE {SCHEMA_NAME}.postjobs SET last_worker_session_id=p_worker_session_id, error=p_error, error_message=p_error_message, error_backtrace=p_error_backtrace, failed_attempts=failed_attempts+1, next_run_at=NULL WHERE id=job_id; PERFORM {SCHEMA_NAME}._prepare_rerun(job_id, p_status, p_fast_mode); PERFORM {SCHEMA_NAME}._wakeup_parent_job(p_worker_session_id, job_id);
END; $$ LANGUAGE plpgsql;
DROP FUNCTION IF EXISTS {SCHEMA_NAME}._set_job_timeout(job_id BIGINT, p_fast_mode BOOLEAN); – removed in 0.5.0 CREATE OR REPLACE FUNCTION {SCHEMA_NAME}._set_job_timeout(
p_worker_session_id UUID, job_id BIGINT, p_fast_mode BOOLEAN) RETURNS VOID AS $$
BEGIN
PERFORM {SCHEMA_NAME}._reset_job_processing(p_worker_session_id, job_id); -- write error info UPDATE {SCHEMA_NAME}.postjobs SET error='Timeout', error_message='timeout', error_backtrace=NULL, failed_attempts=failed_attempts+1, next_run_at=NULL WHERE id=job_id; -- prepare next run, if any PERFORM {SCHEMA_NAME}._prepare_rerun(job_id, 'timeout', p_fast_mode); PERFORM {SCHEMA_NAME}._wakeup_parent_job(p_worker_session_id, job_id);
END; $$ LANGUAGE plpgsql;