DROP FUNCTION IF EXISTS {SCHEMA_NAME}.enqueue(
p_worker_session_id UUID, queue VARCHAR, workflow VARCHAR, workflow_method VARCHAR, workflow_version VARCHAR, args JSONB, parent_id BIGINT, tags JSONB, max_attempts INTEGER, timeout DOUBLE PRECISION);
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.enqueue(
p_worker_session_id UUID, queue VARCHAR, workflow VARCHAR, workflow_method VARCHAR, workflow_version VARCHAR, args JSONB, parent_id BIGINT, tags JSONB, max_attempts INTEGER, timeout DOUBLE PRECISION, p_cron_interval integer);
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.enqueue(
p_worker_session_id UUID, queue VARCHAR, workflow VARCHAR, workflow_method VARCHAR, workflow_version VARCHAR, args JSONB, parent_id BIGINT, tags JSONB, max_attempts INTEGER, timeout DOUBLE PRECISION, p_cron_interval INTEGER, p_is_sticky BOOLEAN);
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.enqueue(
p_worker_session_id UUID, queue VARCHAR, workflow VARCHAR, workflow_method VARCHAR, workflow_version VARCHAR, args JSONB, parent_id BIGINT, tags JSONB, max_attempts INTEGER, timeout DOUBLE PRECISION, p_cron_interval INTEGER, p_is_sticky BOOLEAN, p_is_greedy BOOLEAN) RETURNS SETOF {SCHEMA_NAME}.postjobs
AS $$
DECLARE job_id BIGINT; BEGIN -- check arguments -------------------------------------------------------- IF workflow = '__manual__' AND COALESCE(max_attempts, 1) != 1 THEN RAISE NOTICE 'Adjusting max_attempts of __manual__ job to 1'; max_attempts := 1; END IF; IF p_cron_interval IS NOT NULL THEN IF parent_id IS NOT NULL THEN RAISE 'A cron job must be a root job'; END IF; IF p_cron_interval < 1 THEN RAISE 'The cron_interval must be positive'; END IF; END IF; -- check arguments -------------------------------------------------------- workflow_version := COALESCE(workflow_version, ''); queue := COALESCE(queue, '{DEFAULT_QUEUE}'); max_attempts := COALESCE(max_attempts, 5); p_is_greedy := COALESCE(p_is_greedy, FALSE); p_is_sticky := p_is_greedy OR COALESCE(p_is_sticky, FALSE); -- create postjobs entry ------------------------------------------ INSERT INTO {SCHEMA_NAME}.postjobs ( last_worker_session_id, queue, workflow, workflow_method, workflow_version, args, parent_id, tags, max_attempts, timing_out_at, cron_interval, is_sticky, is_greedy ) VALUES( p_worker_session_id, queue, workflow, workflow_method, workflow_version, args, parent_id, tags, max_attempts, (now() at time zone 'utc') + timeout * interval '1 second', p_cron_interval, p_is_sticky, p_is_greedy ) RETURNING {SCHEMA_NAME}.postjobs.id INTO job_id; -- fill in root_id and full_id ------------------------------------ UPDATE {SCHEMA_NAME}.postjobs SET root_id=COALESCE((SELECT root_id FROM {SCHEMA_NAME}.postjobs s WHERE s.id=postjobs.parent_id), id), full_id=COALESCE((SELECT full_id FROM {SCHEMA_NAME}.postjobs s WHERE s.id=postjobs.parent_id) || '.' || id, id::varchar) WHERE id=job_id; -- return the job ------------------------------------------------- RETURN QUERY SELECT * FROM {SCHEMA_NAME}.postjobs WHERE id=job_id; END;
$$ LANGUAGE plpgsql;