class Que::Locker

Constants

DEFAULT_MAXIMUM_BUFFER_SIZE
DEFAULT_POLL_INTERVAL
DEFAULT_WAIT_PERIOD
DEFAULT_WORKER_PRIORITIES
MESSAGE_RESOLVERS
RESULT_RESOLVERS

Attributes

connection[R]
job_buffer[R]
locks[R]
poll_interval[R]
pollers[R]
queues[R]
thread[R]
workers[R]

Public Class Methods

new( queues: [Que.default_queue], connection_url: nil, listen: true, poll: true, poll_interval: DEFAULT_POLL_INTERVAL, wait_period: DEFAULT_WAIT_PERIOD, maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, worker_priorities: DEFAULT_WORKER_PRIORITIES, on_worker_start: nil, pidfile: nil ) click to toggle source
# File lib/que/locker.rb, line 55
def initialize(
  queues:              [Que.default_queue],
  connection_url:      nil,
  listen:              true,
  poll:                true,
  poll_interval:       DEFAULT_POLL_INTERVAL,
  wait_period:         DEFAULT_WAIT_PERIOD,
  maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
  worker_priorities:   DEFAULT_WORKER_PRIORITIES,
  on_worker_start:     nil,
  pidfile:             nil
)

  # Sanity-check all our arguments, since some users may instantiate Locker
  # directly.
  Que.assert [TrueClass, FalseClass], listen
  Que.assert [TrueClass, FalseClass], poll

  Que.assert Numeric, poll_interval
  Que.assert Numeric, wait_period

  Que.assert Array, worker_priorities
  worker_priorities.each { |p| Que.assert([Integer, NilClass], p) }

  # We assign this globally because we only ever expect one locker to be
  # created per worker process. This can be used by middleware or external
  # code to access the locker during runtime.
  Que.locker = self

  # We use a JobBuffer to track jobs and pass them to workers, and a
  # ResultQueue to receive messages from workers.
  @job_buffer = JobBuffer.new(
    maximum_size: maximum_buffer_size,
    priorities:   worker_priorities.uniq,
  )

  @result_queue = ResultQueue.new

  @stop = false

  Que.internal_log :locker_instantiate, self do
    {
      queues:              queues,
      listen:              listen,
      poll:                poll,
      poll_interval:       poll_interval,
      wait_period:         wait_period,
      maximum_buffer_size: maximum_buffer_size,
      worker_priorities:   worker_priorities,
    }
  end

  # Local cache of which advisory locks are held by this connection.
  @locks = Set.new

  @poll_interval = poll_interval

  if queues.is_a?(Hash)
    @queue_names = queues.keys
    @queues = queues.transform_values do |interval|
      interval || poll_interval
    end
  else
    @queue_names = queues
    @queues = queues.map do |queue_name|
      [queue_name, poll_interval]
    end.to_h
  end

  @wait_period = wait_period.to_f / 1000 # Milliseconds to seconds.

  @workers =
    worker_priorities.map do |priority|
      Worker.new(
        priority:       priority,
        job_buffer:     @job_buffer,
        result_queue:   @result_queue,
        start_callback: on_worker_start,
      )
    end

  # To prevent race conditions, let every worker get into a ready state
  # before starting up the locker thread.
  loop do
    break if job_buffer.waiting_count == workers.count
    sleep 0.001
  end

  # If we weren't passed a specific connection_url, borrow a connection from
  # the pool and derive the connection string from it.
  connection_args =
    if connection_url
      uri = URI.parse(connection_url)

      opts =
        {
          host:     uri.host,
          user:     uri.user,
          password: uri.password,
          port:     uri.port || 5432,
          dbname:   uri.path[1..-1],
        }

      if uri.query
        opts.merge!(Hash[uri.query.split("&").map{|s| s.split('=')}.map{|a,b| [a.to_sym, b]}])
      end

      opts
    else
      Que.pool.checkout do |conn|
        c = conn.wrapped_connection

        {
          host:     c.host,
          user:     c.user,
          password: c.pass,
          port:     c.port,
          dbname:   c.db,
        }
      end
    end

  @connection = Que::Connection.wrap(PG::Connection.open(connection_args))

  @thread =
    Thread.new do
      # An error causing this thread to exit is a bug in Que, which we want
      # to know about ASAP, so propagate the error if it happens.
      Thread.current.abort_on_exception = true

      # Give this thread priority, so it can promptly respond to NOTIFYs.
      Thread.current.priority = 1

      begin
        unless connection_args.has_key?(:application_name)
          @connection.execute(
            "SELECT set_config('application_name', $1, false)",
            ["Que Locker: #{@connection.backend_pid}"]
          )
        end

        Poller.setup(@connection)

        @listener =
          if listen
            Listener.new(connection: @connection)
          end

        @pollers =
          if poll
            @queues.map do |queue_name, interval|
              Poller.new(
                connection:    @connection,
                queue:         queue_name,
                poll_interval: interval,
              )
            end
          end

        work_loop
      ensure
        @connection.wrapped_connection.close
      end
    end

  @pidfile = pidfile
  at_exit { delete_pid }
  write_pid
end

Public Instance Methods

stop() click to toggle source
# File lib/que/locker.rb, line 231
def stop
  @job_buffer.stop
  @stop = true
end
stop!() click to toggle source
# File lib/que/locker.rb, line 225
def stop!
  stop
  wait_for_stop
  delete_pid
end
stopping?() click to toggle source
# File lib/que/locker.rb, line 236
def stopping?
  @stop
end
wait_for_stop() click to toggle source
# File lib/que/locker.rb, line 240
def wait_for_stop
  @thread.join
end

Private Instance Methods

cycle() click to toggle source
# File lib/que/locker.rb, line 298
def cycle
  # Poll at the start of a cycle, so that when the worker starts up we can
  # load up the queue with jobs immediately.
  poll

  # If we got the stop call while we were polling, break before going to
  # sleep.
  return if @stop

  # The main sleeping part of the cycle. If this is a listening locker, this
  # is where we wait for notifications.
  wait

  # Manage any job output we got while we were sleeping.
  handle_results

  # If we haven't gotten the stop signal, cycle again.
  !@stop
end
delete_pid() click to toggle source
# File lib/que/locker.rb, line 531
def delete_pid
  return unless @pidfile

  File.delete(@pidfile) if File.exist?(@pidfile)
end
finish_jobs(metajobs) click to toggle source
# File lib/que/locker.rb, line 476
def finish_jobs(metajobs)
  unlock_jobs(metajobs)
end
handle_results() click to toggle source
# File lib/que/locker.rb, line 396
def handle_results
  messages_by_type =
    @result_queue.clear.group_by{|r| r.fetch(:message_type)}

  messages_by_type.each do |type, messages|
    if resolver = RESULT_RESOLVERS[type]
      instance_exec messages, &resolver
    else
      raise Error, "Unexpected result type: #{type.inspect}"
    end
  end
end
lock_jobs(metajobs) click to toggle source
# File lib/que/locker.rb, line 409
    def lock_jobs(metajobs)
      metajobs.reject! { |m| @locks.include?(m.id) }
      return metajobs if metajobs.empty?

      ids = metajobs.map { |m| m.id.to_i }

      Que.internal_log :locker_locking, self do
        {
          backend_pid: connection.backend_pid,
          ids:         ids,
        }
      end

      materalize_cte = connection.server_version >= 12_00_00

      jobs =
        connection.execute \
          <<-SQL
            WITH jobs AS #{materalize_cte ? 'MATERIALIZED' : ''} (SELECT * FROM que_jobs WHERE id IN (#{ids.join(', ')}))
            SELECT * FROM jobs WHERE pg_try_advisory_lock(id)
          SQL

      jobs_by_id = {}

      jobs.each do |job|
        id = job.fetch(:id)
        mark_id_as_locked(id)
        jobs_by_id[id] = job
      end

      metajobs.keep_if do |metajob|
        if job = jobs_by_id[metajob.id]
          metajob.set_job(job)
          true
        else
          false
        end
      end
    end
mark_id_as_locked(id) click to toggle source
# File lib/que/locker.rb, line 517
def mark_id_as_locked(id)
  Que.assert(@locks.add?(id)) do
    "Tried to lock a job that was already locked: #{id}"
  end
end
poll() click to toggle source
# File lib/que/locker.rb, line 328
def poll
  # Only poll when there are pollers to use (that is, when polling is
  # enabled).
  return unless pollers

  # Figure out what job priorities we have to fill.
  priorities = job_buffer.available_priorities

  # Only poll when there are workers ready for jobs.
  return if priorities.empty?

  all_metajobs = []

  pollers.each do |poller|
    Que.internal_log(:locker_polling, self) {
      {
        priorities: priorities,
        held_locks: @locks.to_a,
        queue: poller.queue,
      }
    }

    if metajobs = poller.poll(priorities: priorities, held_locks: @locks)
      metajobs.sort!
      all_metajobs.concat(metajobs)

      # Update the desired priorities list to take the priorities that we
      # just retrieved into account.
      metajobs.each do |metajob|
        job_priority = metajob.job.fetch(:priority)

        priorities.each do |priority, count|
          if job_priority <= priority
            new_priority = count - 1

            if new_priority <= 0
              priorities.delete(priority)
            else
              priorities[priority] = new_priority
            end

            break
          end
        end
      end

      break if priorities.empty?
    end
  end

  all_metajobs.each { |metajob| mark_id_as_locked(metajob.id) }
  push_jobs(all_metajobs)
end
push_jobs(metajobs) click to toggle source
# File lib/que/locker.rb, line 449
    def push_jobs(metajobs)
      return if metajobs.empty?

      # First check that the jobs are all still visible/available in the DB.
      ids = metajobs.map(&:id)

      verified_ids =
        connection.execute(
          <<-SQL
            SELECT id
            FROM public.que_jobs
            WHERE finished_at IS NULL
              AND expired_at IS NULL
              AND id IN (#{ids.join(', ')})
          SQL
        ).map{|h| h[:id]}.to_set

      good, bad = metajobs.partition{|mj| verified_ids.include?(mj.id)}

      # Need to unlock any low-importance jobs the new ones may displace.
      if displaced = @job_buffer.push(*good)
        bad.concat(displaced)
      end

      unlock_jobs(bad)
    end
shutdown() click to toggle source
# File lib/que/locker.rb, line 318
def shutdown
  unlock_jobs(@job_buffer.clear)
  wait_for_shutdown
  handle_results
end
startup() click to toggle source
# File lib/que/locker.rb, line 283
def startup
  # A previous locker that didn't exit cleanly may have left behind
  # a bad locker record, so clean up before registering.
  connection.execute :clean_lockers
  connection.execute :register_locker, [
    @workers.count,
    "{#{@workers.map(&:priority).map{|p| p || 'NULL'}.join(',')}}",
    Process.pid,
    CURRENT_HOSTNAME,
    !!@listener,
    "{\"#{@queue_names.join('","')}\"}",
    Que.job_schema_version,
  ]
end
unlock_jobs(metajobs) click to toggle source
# File lib/que/locker.rb, line 480
def unlock_jobs(metajobs)
  return if metajobs.empty?

  # Unclear how untrusted input would get passed to this method, but since
  # we need string interpolation here, make sure we only have integers.
  ids = metajobs.map { |job| job.id.to_i }

  Que.internal_log :locker_unlocking, self do
    {
      backend_pid: connection.backend_pid,
      ids:         ids,
    }
  end

  values = ids.join('), (')

  results =
    connection.execute \
      "SELECT pg_advisory_unlock(v.i) FROM (VALUES (#{values})) v (i)"

  results.each do |result|
    Que.assert(result.fetch(:pg_advisory_unlock)) do
      [
        "Tried to unlock a job we hadn't locked!",
        results.inspect,
        ids.inspect,
      ].join(' ')
    end
  end

  ids.each do |id|
    Que.assert(@locks.delete?(id)) do
      "Tried to remove a local lock that didn't exist!: #{id}"
    end
  end
end
wait() click to toggle source
# File lib/que/locker.rb, line 382
def wait
  if l = @listener
    l.wait_for_grouped_messages(@wait_period).each do |type, messages|
      if resolver = MESSAGE_RESOLVERS[type]
        instance_exec messages, &resolver
      else
        raise Error, "Unexpected message type: #{type.inspect}"
      end
    end
  else
    sleep(@wait_period)
  end
end
wait_for_shutdown() click to toggle source
# File lib/que/locker.rb, line 324
def wait_for_shutdown
  @workers.each(&:wait_until_stopped)
end
work_loop() click to toggle source
# File lib/que/locker.rb, line 248
def work_loop
  Que.log(
    level: :debug,
    event: :locker_start,
    queues: @queue_names,
  )

  Que.internal_log :locker_start, self do
    {
      backend_pid: connection.backend_pid,
      worker_priorities: workers.map(&:priority),
      pollers: pollers && pollers.map { |p| [p.queue, p.poll_interval] }
    }
  end

  begin
    @listener.listen if @listener

    startup

    {} while cycle

    Que.log(
      level: :debug,
      event: :locker_stop,
    )

    shutdown
  ensure
    connection.execute :clean_lockers

    @listener.unlisten if @listener
  end
end
write_pid() click to toggle source
# File lib/que/locker.rb, line 523
def write_pid
  return unless @pidfile

  File.open(@pidfile, "w+") do |f|
    f.write(Process.pid.to_s)
  end
end