– worker_session_start: starts or reuses a worker_session ———————————-
DROP FUNCTION IF EXISTS {SCHEMA_NAME}.worker_session_start(p_host_id UUID, p_workflows VARCHAR[]);
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.worker_session_start(p_host_id UUID, p_workflows VARCHAR[], p_queues VARCHAR[]) RETURNS SETOF {SCHEMA_NAME}.worker_sessions AS $$ DECLARE
v_worker_session_id UUID; v_client_socket VARCHAR;
BEGIN
SELECT client_addr || ':' || client_port INTO v_client_socket FROM pg_stat_activity WHERE pid = pg_backend_pid(); INSERT INTO {SCHEMA_NAME}.worker_sessions (host_id, client_socket, workflows, queues) VALUES (p_host_id, v_client_socket, p_workflows, p_queues) RETURNING id INTO v_worker_session_id; UPDATE {SCHEMA_NAME}.hosts SET status = 'running' WHERE id=p_host_id; RETURN QUERY SELECT * FROM {SCHEMA_NAME}.worker_sessions WHERE id = v_worker_session_id;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.worker_session_stop(p_worker_session_id UUID) RETURNS VOID AS $$ DECLARE
v_host_id uuid;
BEGIN
UPDATE {SCHEMA_NAME}.worker_sessions SET status='stopped' WHERE id=p_worker_session_id;
END; $$ LANGUAGE plpgsql;
– wakeup runners after changing hosts
– when a host changes its status to shutdown, all of its runners should – shutdown quickly.
CREATE OR REPLACE FUNCTION {SCHEMA_NAME}._recalculate_host_status() RETURNS TRIGGER AS $$
BEGIN IF NOT EXISTS ( SELECT 1 FROM {SCHEMA_NAME}.worker_sessions WHERE id=NEW.host_id AND status = 'running' ) THEN UPDATE {SCHEMA_NAME}.hosts SET status = 'stopped' WHERE id=NEW.host_id; END IF; RETURN NEW; END;
$$ LANGUAGE plpgsql;
BEGIN;
DROP TRIGGER IF EXISTS _recalculate_host_status ON {SCHEMA_NAME}.worker_sessions; CREATE TRIGGER _recalculate_host_status AFTER UPDATE ON {SCHEMA_NAME}.worker_sessions FOR EACH ROW EXECUTE PROCEDURE {SCHEMA_NAME}._recalculate_host_status();
COMMIT;