ALTER TABLE que_jobs SET (fillfactor = 90); ALTER TABLE que_jobs RENAME COLUMN last_error TO last_error_message; ALTER TABLE que_jobs RENAME COLUMN job_id TO id; ALTER TABLE que_jobs RENAME COLUMN args TO old_args; ALTER SEQUENCE que_jobs_job_id_seq RENAME TO que_jobs_id_seq;
ALTER TABLE que_jobs
ADD COLUMN last_error_backtrace text, ADD COLUMN finished_at timestamptz, ADD COLUMN expired_at timestamptz, ADD COLUMN args JSONB, ADD COLUMN data JSONB;
ALTER TABLE que_jobs DROP CONSTRAINT que_jobs_pkey;
UPDATE que_jobs SET
queue = CASE queue WHEN '' THEN 'default' ELSE queue END, last_error_backtrace = CASE WHEN last_error_message ~ '\n' THEN left(regexp_replace(last_error_message, '^[^\n]+\n', ''), 10000) ELSE NULL END, last_error_message = left(substring(last_error_message from '^[^\n]+'), 500), args = CASE json_typeof(old_args) WHEN 'array' THEN old_args::jsonb ELSE jsonb_build_array(old_args) END, data = '{}'::jsonb;
CREATE FUNCTION que_validate_tags(tags_array jsonb) RETURNS boolean AS $$
SELECT bool_and( jsonb_typeof(value) = 'string' AND char_length(value::text) <= 100 ) FROM jsonb_array_elements(tags_array)
$$ LANGUAGE SQL;
– Now that we’re done rewriting data, add new indexes. CREATE INDEX que_poll_idx ON que_jobs (queue, priority, run_at, id) WHERE (finished_at IS NULL AND expired_at IS NULL); CREATE INDEX que_jobs_data_gin_idx ON que_jobs USING gin (data jsonb_path_ops); CREATE INDEX que_jobs_args_gin_idx ON que_jobs USING gin (args jsonb_path_ops);
ALTER TABLE que_jobs
ADD PRIMARY KEY (id), DROP COLUMN old_args, ALTER COLUMN queue SET DEFAULT 'default', ALTER COLUMN args SET DEFAULT '[]', ALTER COLUMN args SET NOT NULL, ALTER COLUMN data SET DEFAULT '{}', ALTER COLUMN data SET NOT NULL, ADD CONSTRAINT queue_length CHECK ( char_length(queue) <= 100 ), ADD CONSTRAINT job_class_length CHECK ( char_length( CASE job_class WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN args->0->>'job_class' ELSE job_class END ) <= 200 ), ADD CONSTRAINT valid_args CHECK ( (jsonb_typeof(args) = 'array') ), ADD CONSTRAINT valid_data CHECK ( (jsonb_typeof(data) = 'object') AND ( (NOT data ? 'tags') OR ( (jsonb_typeof(data->'tags') = 'array') AND (jsonb_array_length(data->'tags') <= 5) AND (public.que_validate_tags(data->'tags')) ) ) ), ADD CONSTRAINT error_length CHECK ( (char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000) );
– This is somewhat heretical, but we’re going to need some more flexible – storage to support various features without requiring a ton of migrations, – which would be a lot of hassle for users. Hopefully this will be used smartly – and sparingly (famous last words). CREATE TABLE que_values (
key text PRIMARY KEY, value jsonb NOT NULL DEFAULT '{}', CONSTRAINT valid_value CHECK (jsonb_typeof(value) = 'object')
) WITH (FILLFACTOR=90);
CREATE UNLOGGED TABLE que_lockers (
pid integer NOT NULL CONSTRAINT que_lockers_pkey PRIMARY KEY, worker_count integer NOT NULL, worker_priorities integer[] NOT NULL, ruby_pid integer NOT NULL, ruby_hostname text NOT NULL, queues text[] NOT NULL, listening boolean NOT NULL, CONSTRAINT valid_worker_priorities CHECK ( (array_ndims(worker_priorities) = 1) AND (array_length(worker_priorities, 1) IS NOT NULL) -- Doesn't do zero. ), CONSTRAINT valid_queues CHECK ( (array_ndims(queues) = 1) AND (array_length(queues, 1) IS NOT NULL) -- Doesn't do zero. )
);
CREATE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE locker_pid integer; sort_key json; BEGIN -- Don't do anything if the job is scheduled for a future time. IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN RETURN null; END IF; -- Pick a locker to notify of the job's insertion, weighted by their number -- of workers. Should bounce pseudorandomly between lockers on each -- invocation, hence the md5-ordering, but still touch each one equally, -- hence the modulo using the job_id. SELECT pid INTO locker_pid FROM ( SELECT *, last_value(row_number) OVER () + 1 AS count FROM ( SELECT *, row_number() OVER () - 1 AS row_number FROM ( SELECT * FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id WHERE listening AND queues @> ARRAY[NEW.queue] ORDER BY md5(pid::text || id::text) ) t1 ) t2 ) t3 WHERE NEW.id % count = row_number; IF locker_pid IS NOT NULL THEN -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so -- rather than throw errors when someone enqueues a big job, just -- broadcast the most pertinent information, and let the locker query for -- the record after it's taken the lock. The worker will have to hit the -- DB in order to make sure the job is still visible anyway. SELECT row_to_json(t) INTO sort_key FROM ( SELECT 'job_available' AS message_type, NEW.queue AS queue, NEW.priority AS priority, NEW.id AS id, -- Make sure we output timestamps as UTC ISO 8601 to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at ) t; PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); END IF; RETURN null; END
$$ LANGUAGE plpgsql;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_job_notify();
CREATE FUNCTION que_determine_job_state(job public.que_jobs) RETURNS text AS $$
SELECT CASE WHEN job.expired_at IS NOT NULL THEN 'expired' WHEN job.finished_at IS NOT NULL THEN 'finished' WHEN job.error_count > 0 THEN 'errored' WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled' ELSE 'ready' END
$$ LANGUAGE SQL;
CREATE FUNCTION que_state_notify() RETURNS trigger AS $$
DECLARE row record; message json; previous_state text; current_state text; BEGIN IF TG_OP = 'INSERT' THEN previous_state := 'nonexistent'; current_state := public.que_determine_job_state(NEW); row := NEW; ELSIF TG_OP = 'DELETE' THEN previous_state := public.que_determine_job_state(OLD); current_state := 'nonexistent'; row := OLD; ELSIF TG_OP = 'UPDATE' THEN previous_state := public.que_determine_job_state(OLD); current_state := public.que_determine_job_state(NEW); -- If the state didn't change, short-circuit. IF previous_state = current_state THEN RETURN null; END IF; row := NEW; ELSE RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP; END IF; SELECT row_to_json(t) INTO message FROM ( SELECT 'job_change' AS message_type, row.id AS id, row.queue AS queue, coalesce(row.data->'tags', '[]'::jsonb) AS tags, to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at, to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time, CASE row.job_class WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN coalesce( row.args->0->>'job_class', 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' ) ELSE row.job_class END AS job_class, previous_state AS previous_state, current_state AS current_state ) t; PERFORM pg_notify('que_state', message::text); RETURN null; END
$$ LANGUAGE plpgsql;
CREATE TRIGGER que_state_notify
AFTER INSERT OR UPDATE OR DELETE ON que_jobs FOR EACH ROW EXECUTE PROCEDURE public.que_state_notify();