class Rpush::Daemon::AppRunner

Attributes

app[R]

Public Class Methods

app_ids() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 57
def self.app_ids
  @runners.keys
end
app_running?(app) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 53
def self.app_running?(app)
  @runners.key?(app.id)
end
app_with_id(app_id) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 49
def self.app_with_id(app_id)
  @runners[app_id].app
end
decrement_dispatchers(app, num) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 79
def self.decrement_dispatchers(app, num)
  @runners[app.id].decrement_dispatchers(num)
end
enqueue(notifications) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 15
def self.enqueue(notifications)
  notifications.group_by(&:app_id).each do |app_id, group|
    start_app_with_id(app_id) unless @runners[app_id]
    @runners[app_id].enqueue(group) if @runners[app_id]
  end

  ProcTitle.update
end
increment_dispatchers(app, num) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 83
def self.increment_dispatchers(app, num)
  @runners[app.id].increment_dispatchers(num)
end
new(app) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 94
def initialize(app)
  @app = app
  @loops = []
  @dispatcher_loops = []
end
num_dispatchers_for_app(app) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 74
def self.num_dispatchers_for_app(app)
  runner = @runners[app.id]
  runner ? runner.num_dispatcher_loops : 0
end
start_app(app) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 28
def self.start_app(app)
  Rpush.logger.info("[#{app.name}] Starting #{pluralize(app.connections, 'dispatcher')}... ", true)
  runner = @runners[app.id] = new(app)
  runner.start_dispatchers
  puts Rainbow('✔').green if Rpush.config.foreground && Rpush.config.foreground_logging
  runner.start_loops
rescue StandardError => e
  @runners.delete(app.id)
  Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
  Rpush.logger.error(e)
  reflect(:error, e)
end
start_app_with_id(app_id) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 24
def self.start_app_with_id(app_id)
  start_app(Rpush::Daemon.store.app(app_id))
end
status() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 87
def self.status
  { app_runners: @runners.values.map(&:status) }
end
stop() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 61
def self.stop
  @runners.values.map(&:stop)
  @runners.clear
end
stop_app(app_id) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 41
def self.stop_app(app_id)
  runner = @runners.delete(app_id)
  if runner
    runner.stop
    log_info("[#{runner.app.name}] Stopped.")
  end
end
total_dispatchers() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 66
def self.total_dispatchers
  @runners.values.sum(&:num_dispatcher_loops)
end
total_queued() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 70
def self.total_queued
  @runners.values.sum(&:queue_size)
end

Public Instance Methods

decrement_dispatchers(num) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 135
def decrement_dispatchers(num)
  num.times { @dispatcher_loops.pop.stop }
end
enqueue(notifications) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 119
def enqueue(notifications)
  if service.batch_deliveries?
    batch_size = (notifications.size / num_dispatcher_loops.to_f).ceil
    notifications.in_groups_of(batch_size, false).each do |batch_notifications|
      batch = Batch.new(batch_notifications)
      queue.push(QueuePayload.new(batch))
    end
  else
    batch = Batch.new(notifications)
    notifications.each do |notification|
      queue.push(QueuePayload.new(batch, notification))
      reflect(:notification_enqueued, notification)
    end
  end
end
increment_dispatchers(num) click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 139
def increment_dispatchers(num)
  num.times { @dispatcher_loops.push(new_dispatcher_loop) }
end
num_dispatcher_loops() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 157
def num_dispatcher_loops
  @dispatcher_loops.size
end
start_dispatchers() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 100
def start_dispatchers
  app.connections.times { @dispatcher_loops.push(new_dispatcher_loop) }
end
start_loops() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 104
def start_loops
  @loops = service.loop_instances(@app)
  @loops.map(&:start)
end
status() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 143
def status
  dispatcher_details = {}

  @dispatcher_loops.each_with_index do |dispatcher_loop, i|
    dispatcher_details[i] = {
      started_at: dispatcher_loop.started_at.iso8601,
      dispatched: dispatcher_loop.dispatch_count,
      thread_status: dispatcher_loop.thread_status
    }
  end

  { app_name: @app.name, dispatchers: dispatcher_details, queued: queue_size }
end
stop() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 109
def stop
  wait_until_idle
  stop_dispatcher_loops
  stop_loops
end
wait_until_idle() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 115
def wait_until_idle
  sleep 0.5 while queue.size > 0
end

Private Instance Methods

new_dispatcher_loop() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 173
def new_dispatcher_loop
  dispatcher = service.new_dispatcher(@app)
  dispatcher_loop = Rpush::Daemon::DispatcherLoop.new(queue, dispatcher)
  dispatcher_loop.start
  dispatcher_loop
end
queue() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 185
def queue
  @queue ||= Queue.new
end
service() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 180
def service
  return @service if defined? @service
  @service = "Rpush::Daemon::#{@app.service_name.camelize}".constantize
end
stop_dispatcher_loops() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 168
def stop_dispatcher_loops
  @dispatcher_loops.map(&:stop)
  @dispatcher_loops.clear
end
stop_loops() click to toggle source
# File lib/rpush/daemon/app_runner.rb, line 163
def stop_loops
  @loops.map(&:stop)
  @loops = []
end