DROP TRIGGER que_job_notify ON que_jobs; DROP FUNCTION que_job_notify();
DROP INDEX que_poll_idx_with_job_schema_version;
ALTER TABLE que_jobs
DROP COLUMN job_schema_version;
ALTER TABLE que_lockers
DROP COLUMN job_schema_version;
CREATE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE locker_pid integer; sort_key json; BEGIN -- Don't do anything if the job is scheduled for a future time. IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN RETURN null; END IF; -- Pick a locker to notify of the job's insertion, weighted by their number -- of workers. Should bounce pseudorandomly between lockers on each -- invocation, hence the md5-ordering, but still touch each one equally, -- hence the modulo using the job_id. SELECT pid INTO locker_pid FROM ( SELECT *, last_value(row_number) OVER () + 1 AS count FROM ( SELECT *, row_number() OVER () - 1 AS row_number FROM ( SELECT * FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id WHERE listening AND queues @> ARRAY[NEW.queue] ORDER BY md5(pid::text || id::text) ) t1 ) t2 ) t3 WHERE NEW.id % count = row_number; IF locker_pid IS NOT NULL THEN -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so -- rather than throw errors when someone enqueues a big job, just -- broadcast the most pertinent information, and let the locker query for -- the record after it's taken the lock. The worker will have to hit the -- DB in order to make sure the job is still visible anyway. SELECT row_to_json(t) INTO sort_key FROM ( SELECT 'job_available' AS message_type, NEW.queue AS queue, NEW.priority AS priority, NEW.id AS id, -- Make sure we output timestamps as UTC ISO 8601 to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at ) t; PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); END IF; RETURN null; END
$$ LANGUAGE plpgsql;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify();