DROP TRIGGER que_job_notify ON que_jobs; DROP FUNCTION que_job_notify();

DROP INDEX que_poll_idx_with_job_schema_version;

ALTER TABLE que_jobs

DROP COLUMN job_schema_version;

ALTER TABLE que_lockers

DROP COLUMN job_schema_version;

CREATE FUNCTION que_job_notify() RETURNS trigger AS $$

DECLARE
  locker_pid integer;
  sort_key json;
BEGIN
  -- Don't do anything if the job is scheduled for a future time.
  IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
    RETURN null;
  END IF;

  -- Pick a locker to notify of the job's insertion, weighted by their number
  -- of workers. Should bounce pseudorandomly between lockers on each
  -- invocation, hence the md5-ordering, but still touch each one equally,
  -- hence the modulo using the job_id.
  SELECT pid
  INTO locker_pid
  FROM (
    SELECT *, last_value(row_number) OVER () + 1 AS count
    FROM (
      SELECT *, row_number() OVER () - 1 AS row_number
      FROM (
        SELECT *
        FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
        WHERE
          listening AND
          queues @> ARRAY[NEW.queue]
        ORDER BY md5(pid::text || id::text)
      ) t1
    ) t2
  ) t3
  WHERE NEW.id % count = row_number;

  IF locker_pid IS NOT NULL THEN
    -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
    -- rather than throw errors when someone enqueues a big job, just
    -- broadcast the most pertinent information, and let the locker query for
    -- the record after it's taken the lock. The worker will have to hit the
    -- DB in order to make sure the job is still visible anyway.
    SELECT row_to_json(t)
    INTO sort_key
    FROM (
      SELECT
        'job_available' AS message_type,
        NEW.queue       AS queue,
        NEW.priority    AS priority,
        NEW.id          AS id,
        -- Make sure we output timestamps as UTC ISO 8601
        to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
    ) t;

    PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
  END IF;

  RETURN null;
END

$$ LANGUAGE plpgsql;

CREATE TRIGGER que_job_notify

AFTER INSERT ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE public.que_job_notify();