– worker_session_start: starts or reuses a worker_session ———————————-

DROP FUNCTION IF EXISTS {SCHEMA_NAME}.worker_session_start(p_host_id UUID, p_workflows VARCHAR[]);

CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.worker_session_start(p_host_id UUID, p_workflows VARCHAR[], p_queues VARCHAR[]) RETURNS SETOF {SCHEMA_NAME}.worker_sessions AS $$ DECLARE

v_worker_session_id UUID;
v_client_socket VARCHAR;

BEGIN

SELECT client_addr || ':' || client_port INTO v_client_socket FROM pg_stat_activity WHERE pid = pg_backend_pid();

INSERT INTO {SCHEMA_NAME}.worker_sessions (host_id, client_socket, workflows, queues)
  VALUES (p_host_id, v_client_socket, p_workflows, p_queues) RETURNING id INTO v_worker_session_id;

UPDATE {SCHEMA_NAME}.hosts 
  SET status = 'running'
  WHERE id=p_host_id;

RETURN QUERY SELECT * FROM {SCHEMA_NAME}.worker_sessions WHERE id = v_worker_session_id;

END; $$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION {SCHEMA_NAME}.worker_session_stop(p_worker_session_id UUID) RETURNS VOID AS $$ DECLARE

v_host_id uuid;

BEGIN

UPDATE {SCHEMA_NAME}.worker_sessions
  SET status='stopped' WHERE id=p_worker_session_id;

END; $$ LANGUAGE plpgsql;

– wakeup runners after changing hosts

– when a host changes its status to shutdown, all of its runners should – shutdown quickly.

CREATE OR REPLACE FUNCTION {SCHEMA_NAME}._recalculate_host_status() RETURNS TRIGGER AS $$

BEGIN
  IF NOT EXISTS (
    SELECT 1 FROM {SCHEMA_NAME}.worker_sessions WHERE id=NEW.host_id AND status = 'running'
  ) THEN
    UPDATE {SCHEMA_NAME}.hosts 
      SET status = 'stopped'
      WHERE id=NEW.host_id;
  END IF;

  RETURN NEW;
END;

$$ LANGUAGE plpgsql;

BEGIN;

DROP TRIGGER IF EXISTS _recalculate_host_status ON {SCHEMA_NAME}.worker_sessions;

CREATE TRIGGER _recalculate_host_status AFTER UPDATE
  ON {SCHEMA_NAME}.worker_sessions
  FOR EACH ROW
  EXECUTE PROCEDURE {SCHEMA_NAME}._recalculate_host_status();

COMMIT;