class GoodJob::Notifier

Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.

Notifiers can emit NOTIFY messages through Postgres. A notifier will LISTEN for messages by creating a background thread that runs in an instance of Concurrent::ThreadPoolExecutor. When a message is received, the notifier passes the message to each of its recipients.

Constants

AdapterCannotListenError

Raised if the Database adapter does not implement LISTEN.

CHANNEL

Default Postgres channel for LISTEN/NOTIFY

EXECUTOR_OPTIONS

Defaults for instance of Concurrent::ThreadPoolExecutor

RECONNECT_INTERVAL

Seconds to wait if database cannot be connected to

WAIT_INTERVAL

Seconds to block while LISTENing for a message

Attributes

executor[R]
recipients[R]

List of recipients that will receive notifications. @return [Array<#call, Array(Object, Symbol)>]

Public Class Methods

new(*recipients) click to toggle source

@param recipients [Array<#call, Array(Object, Symbol)>]

   # File lib/good_job/notifier.rb
53 def initialize(*recipients)
54   @recipients = Concurrent::Array.new(recipients)
55   @listening = Concurrent::AtomicBoolean.new(false)
56 
57   self.class.instances << self
58 
59   create_executor
60   listen
61 end
notify(message) click to toggle source

Send a message via Postgres NOTIFY @param message [#to_json]

   # File lib/good_job/notifier.rb
41     def self.notify(message)
42       connection = Job.connection
43       connection.exec_query <<~SQL.squish
44         NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)}
45       SQL
46     end

Public Instance Methods

listen_observer(_time, _result, thread_error) click to toggle source

Invoked on completion of ThreadPoolExecutor task @!visibility private @return [void]

    # File lib/good_job/notifier.rb
112 def listen_observer(_time, _result, thread_error)
113   return if thread_error.is_a? AdapterCannotListenError
114 
115   if thread_error
116     GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call)
117     ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error })
118   end
119 
120   return if shutdown?
121 
122   if thread_error.is_a?(ActiveRecord::ConnectionNotEstablished) || thread_error.is_a?(ActiveRecord::StatementInvalid)
123     listen(delay: RECONNECT_INTERVAL)
124   else
125     listen
126   end
127 end
listening?() click to toggle source

Tests whether the notifier is active and listening for new messages. @return [true, false, nil]

   # File lib/good_job/notifier.rb
65 def listening?
66   @listening.true?
67 end
restart(timeout: -1) click to toggle source

Restart the notifier. When shutdown, start; or shutdown and start. @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. @return [void]

    # File lib/good_job/notifier.rb
103 def restart(timeout: -1)
104   shutdown(timeout: timeout) if running?
105   create_executor
106   listen
107 end
shutdown(timeout: -1) click to toggle source

Shut down the notifier. This stops the background LISTENing thread. Use {#shutdown?} to determine whether threads have stopped. @param timeout [Numeric, nil] Seconds to wait for active threads.

* +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
* +-1+, the scheduler will wait until the shutdown is complete.
* +0+, the scheduler will immediately shutdown and stop any threads.
* A positive number will wait that many seconds before stopping any remaining active threads.

@return [void]

   # File lib/good_job/notifier.rb
88 def shutdown(timeout: -1)
89   return if executor.nil? || executor.shutdown?
90 
91   executor.shutdown if executor.running?
92 
93   if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause
94     executor_wait = timeout.negative? ? nil : timeout
95     executor.kill unless executor.wait_for_termination(executor_wait)
96   end
97 end

Private Instance Methods

create_executor() click to toggle source
    # File lib/good_job/notifier.rb
133 def create_executor
134   @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
135 end
listen(delay: 0) click to toggle source
    # File lib/good_job/notifier.rb
137 def listen(delay: 0)
138   future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
139     with_listen_connection do |conn|
140       ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
141         conn.async_exec("LISTEN #{CHANNEL}").clear
142       end
143 
144       ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
145         thr_listening.make_true
146         while thr_executor.running?
147           conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
148             next unless channel == CHANNEL
149 
150             ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
151             parsed_payload = JSON.parse(payload, symbolize_names: true)
152             thr_recipients.each do |recipient|
153               target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
154               target.send(method_name, parsed_payload)
155             end
156           end
157         end
158       end
159     ensure
160       thr_listening.make_false
161       ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
162         conn.async_exec("UNLISTEN *").clear
163       end
164     end
165   end
166 
167   future.add_observer(self, :listen_observer)
168   future.execute
169 end
with_listen_connection() { |pg_conn| ... } click to toggle source
    # File lib/good_job/notifier.rb
171 def with_listen_connection
172   ar_conn = Job.connection_pool.checkout.tap do |conn|
173     Job.connection_pool.remove(conn)
174   end
175   pg_conn = ar_conn.raw_connection
176   raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify
177 
178   pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear
179   yield pg_conn
180 ensure
181   ar_conn&.disconnect!
182 end