class GoodJob::Notifier
Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
Notifiers can emit NOTIFY messages through Postgres. A notifier will LISTEN for messages by creating a background thread that runs in an instance of Concurrent::ThreadPoolExecutor
. When a message is received, the notifier passes the message to each of its recipients.
Constants
- AdapterCannotListenError
Raised if the Database adapter does not implement LISTEN.
- CHANNEL
Default Postgres channel for LISTEN/NOTIFY
- EXECUTOR_OPTIONS
Defaults for instance of Concurrent::ThreadPoolExecutor
- RECONNECT_INTERVAL
Seconds to wait if database cannot be connected to
- WAIT_INTERVAL
Seconds to block while LISTENing for a message
Attributes
List of recipients that will receive notifications. @return [Array<#call, Array(Object, Symbol)>]
Public Class Methods
@param recipients [Array<#call, Array(Object, Symbol)>]
# File lib/good_job/notifier.rb 53 def initialize(*recipients) 54 @recipients = Concurrent::Array.new(recipients) 55 @listening = Concurrent::AtomicBoolean.new(false) 56 57 self.class.instances << self 58 59 create_executor 60 listen 61 end
Send a message via Postgres NOTIFY @param message [#to_json]
# File lib/good_job/notifier.rb 41 def self.notify(message) 42 connection = Job.connection 43 connection.exec_query <<~SQL.squish 44 NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)} 45 SQL 46 end
Public Instance Methods
Invoked on completion of ThreadPoolExecutor task @!visibility private @return [void]
# File lib/good_job/notifier.rb 112 def listen_observer(_time, _result, thread_error) 113 return if thread_error.is_a? AdapterCannotListenError 114 115 if thread_error 116 GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call) 117 ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error }) 118 end 119 120 return if shutdown? 121 122 if thread_error.is_a?(ActiveRecord::ConnectionNotEstablished) || thread_error.is_a?(ActiveRecord::StatementInvalid) 123 listen(delay: RECONNECT_INTERVAL) 124 else 125 listen 126 end 127 end
Tests whether the notifier is active and listening for new messages. @return [true, false, nil]
# File lib/good_job/notifier.rb 65 def listening? 66 @listening.true? 67 end
Restart the notifier. When shutdown, start; or shutdown and start. @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. @return [void]
# File lib/good_job/notifier.rb 103 def restart(timeout: -1) 104 shutdown(timeout: timeout) if running? 105 create_executor 106 listen 107 end
Shut down the notifier. This stops the background LISTENing thread. Use {#shutdown?} to determine whether threads have stopped. @param timeout [Numeric, nil] Seconds to wait for active threads.
* +nil+, the scheduler will trigger a shutdown but not wait for it to complete. * +-1+, the scheduler will wait until the shutdown is complete. * +0+, the scheduler will immediately shutdown and stop any threads. * A positive number will wait that many seconds before stopping any remaining active threads.
@return [void]
# File lib/good_job/notifier.rb 88 def shutdown(timeout: -1) 89 return if executor.nil? || executor.shutdown? 90 91 executor.shutdown if executor.running? 92 93 if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause 94 executor_wait = timeout.negative? ? nil : timeout 95 executor.kill unless executor.wait_for_termination(executor_wait) 96 end 97 end
Private Instance Methods
# File lib/good_job/notifier.rb 133 def create_executor 134 @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) 135 end
# File lib/good_job/notifier.rb 137 def listen(delay: 0) 138 future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| 139 with_listen_connection do |conn| 140 ActiveSupport::Notifications.instrument("notifier_listen.good_job") do 141 conn.async_exec("LISTEN #{CHANNEL}").clear 142 end 143 144 ActiveSupport::Dependencies.interlock.permit_concurrent_loads do 145 thr_listening.make_true 146 while thr_executor.running? 147 conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| 148 next unless channel == CHANNEL 149 150 ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) 151 parsed_payload = JSON.parse(payload, symbolize_names: true) 152 thr_recipients.each do |recipient| 153 target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] 154 target.send(method_name, parsed_payload) 155 end 156 end 157 end 158 end 159 ensure 160 thr_listening.make_false 161 ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do 162 conn.async_exec("UNLISTEN *").clear 163 end 164 end 165 end 166 167 future.add_observer(self, :listen_observer) 168 future.execute 169 end
# File lib/good_job/notifier.rb 171 def with_listen_connection 172 ar_conn = Job.connection_pool.checkout.tap do |conn| 173 Job.connection_pool.remove(conn) 174 end 175 pg_conn = ar_conn.raw_connection 176 raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify 177 178 pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear 179 yield pg_conn 180 ensure 181 ar_conn&.disconnect! 182 end