class PerfectQueue::Backend::RDBCompatBackend

Constants

DEFAULT_DELETE_INTERVAL
DELETE_OFFSET

timeout model

0 —- now-1Bs —- retention —|—– now – alive ——- FUTURE

~~~~~~~^  to be deleted ^      |~~~^~~~       ^ running or in-queue
 DELETE          13_0000_0000->|   to be acquired

NOTE: this architecture introduces Year 2042 problem.

EVENT_HORIZON
GZIP_MAGIC_BYTES
KEEPALIVE
LOCK_RETRY_INITIAL_INTERVAL
LOCK_RETRY_MAX_INTERVAL
LOCK_WAIT_TIMEOUT
MAX_RETRY

Attributes

db[R]

Public Class Methods

new(client, config) click to toggle source
Calls superclass method PerfectQueue::BackendHelper::new
# File lib/perfectqueue/backend/rdb_compat.rb, line 42
def initialize(client, config)
  super

  @pq_connect_timeout = config.fetch(:pq_connect_timeout, 20)
  url = config[:url]
  @table = config[:table]
  unless @table
    raise ConfigError, ":table option is required"
  end

  if /\Amysql2:/i =~ url
    options = {max_connections: 1, sslca: config[:sslca]}
    options[:connect_timeout] = config.fetch(:connect_timeout, 3)
    @db = Sequel.connect(url, options)
    if config.fetch(:use_connection_pooling, nil) != nil
      @use_connection_pooling = !!config[:use_connection_pooling]
    else
      @use_connection_pooling = !!config[:sslca]
    end
    @table_lock = lambda {
      locked = nil
      interval = LOCK_RETRY_INITIAL_INTERVAL
      loop do
        @db.fetch("SELECT GET_LOCK('#{@table}', #{LOCK_WAIT_TIMEOUT}) locked") do |row|
          locked = true if row[:locked] == 1
        end
        break if locked
        sleep interval
        interval = [interval * 2, LOCK_RETRY_MAX_INTERVAL].min
      end
    }
    @table_unlock = lambda {
      @db.run("DO RELEASE_LOCK('#{@table}')")
    }
  else
    raise ConfigError, "only 'mysql' is supported"
  end

  @last_time = Time.now.to_i
  @mutex = Mutex.new

  connect {
    # connection test
  }

  @disable_resource_limit = config[:disable_resource_limit]
  @cleanup_interval = config[:cleanup_interval] || DEFAULT_DELETE_INTERVAL
  # If cleanup_interval > max_request_per_child / max_acquire,
  # some processes won't run DELETE query.
  # (it's not an issue when there are enough workers)
  @cleanup_interval_count = @cleanup_interval > 0 ? rand(@cleanup_interval) : 0
end

Public Instance Methods

acquire(alive_time, max_acquire, options) click to toggle source

> [AcquiredTask]

# File lib/perfectqueue/backend/rdb_compat.rb, line 198
def acquire(alive_time, max_acquire, options)
  now = (options[:now] || Time.now).to_i
  next_timeout = now + alive_time
  t0 = nil

  if @cleanup_interval_count <= 0
    connect {
      t0=Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @db["DELETE FROM `#{@table}` WHERE timeout <= ?", now-DELETE_OFFSET].delete
      @cleanup_interval_count = @cleanup_interval
      STDERR.puts"PQ:delete from #{@table}:%6f sec" % [Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0]
    }
  end

  if @disable_resource_limit
    return acquire_without_resource(next_timeout, now, max_acquire)
  else
    return acquire_with_resource(next_timeout, now, max_acquire)
  end
end
compress_data(data, compression) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 156
def compress_data(data, compression)
  if compression == 'gzip'
    io = StringIO.new
    io.set_encoding(Encoding::ASCII_8BIT)
    gz = Zlib::GzipWriter.new(io)
    begin
      gz.write(data)
    ensure
      gz.close
    end
    data = io.string
    data = Sequel::SQL::Blob.new(data)
  end
  data
end
finish(task_token, retention_time, options) click to toggle source

> nil

# File lib/perfectqueue/backend/rdb_compat.rb, line 224
def finish(task_token, retention_time, options)
  now = (options[:now] || Time.now).to_i
  delete_timeout = now - DELETE_OFFSET + retention_time
  key = task_token.key

  connect {
    n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update
    if n <= 0
      raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished."
    end
  }
  nil
end
force_finish(key, retention_time, options) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 219
def force_finish(key, retention_time, options)
  finish(Token.new(key), retention_time, options)
end
get_task_metadata(key, options) click to toggle source

> TaskStatus

# File lib/perfectqueue/backend/rdb_compat.rb, line 125
def get_task_metadata(key, options)
  now = (options[:now] || Time.now).to_i

  connect {
    row = @db.fetch("SELECT timeout, data, created_at, resource, max_running FROM `#{@table}` WHERE id=? LIMIT 1", key).first
    unless row
      raise NotFoundError, "task key=#{key} does no exist"
    end
    attributes = create_attributes(now, row)
    return TaskMetadata.new(@client, key, attributes)
  }
end
heartbeat(task_token, alive_time, options) click to toggle source

> nil

# File lib/perfectqueue/backend/rdb_compat.rb, line 239
def heartbeat(task_token, alive_time, options)
  now = (options[:now] || Time.now).to_i
  next_timeout = now + alive_time
  key = task_token.key
  data = options[:data]

  sql = "UPDATE `#{@table}` SET timeout=?"
  params = [sql, next_timeout]
  if data
    sql << ", data=?"
    params << compress_data(data.to_json, options[:compression])
  end
  sql << " WHERE id=? AND created_at IS NOT NULL"
  params << key

  connect {
    n = @db[*params].update
    if n <= 0
      row = @db.fetch("SELECT id, timeout, created_at FROM `#{@table}` WHERE id=? LIMIT 1", key).first
      if row == nil
        raise PreemptedError, "task key=#{key} does not exist or preempted."
      elsif row[:created_at] == nil
        raise PreemptedError, "task key=#{key} preempted."
      else # row[:timeout] == next_timeout
        # ok
      end
    end
  }
  nil
end
init_database(options) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 102
      def init_database(options)
        sql = []
        sql << "DROP TABLE IF EXISTS `#{@table}`" if options[:force]
        sql << <<-SQL
          CREATE TABLE IF NOT EXISTS `#{@table}` (
            id VARCHAR(255) NOT NULL,
            timeout INT NOT NULL,
            data LONGBLOB NOT NULL,
            created_at INT,
            resource VARCHAR(255),
            max_running INT,
            /* CONNECTION_ID() can be 64bit: https://bugs.mysql.com/bug.php?id=19806 */
            owner BIGINT(21) UNSIGNED NOT NULL DEFAULT 0,
            PRIMARY KEY (id)
          )
          SQL
        sql << "CREATE INDEX `index_#{@table}_on_timeout` ON `#{@table}` (`timeout`)"
        connect {
          sql.each(&@db.method(:run))
        }
      end
list(options) { |task| ... } click to toggle source

yield [TaskWithMetadata]

# File lib/perfectqueue/backend/rdb_compat.rb, line 144
def list(options, &block)
  now = (options[:now] || Time.now).to_i

  connect {
    @db.fetch("SELECT id, timeout, data, created_at, resource, max_running FROM `#{@table}` ORDER BY timeout ASC") {|row|
      attributes = create_attributes(now, row)
      task = TaskWithMetadata.new(@client, row[:id], attributes)
      yield task
    }
  }
end
preempt(key, alive_time, options) click to toggle source

> AcquiredTask

# File lib/perfectqueue/backend/rdb_compat.rb, line 139
def preempt(key, alive_time, options)
  raise NotSupportedError.new("preempt is not supported by rdb_compat backend")
end
release(task_token, alive_time, options) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 270
def release(task_token, alive_time, options)
  heartbeat(task_token, alive_time, options)
end
submit(key, type, data, options) click to toggle source

> Task

# File lib/perfectqueue/backend/rdb_compat.rb, line 173
def submit(key, type, data, options)
  now = (options[:now] || Time.now).to_i
  now = 1 if now < 1  # 0 means cancel requested
  run_at = (options[:run_at] || now).to_i
  user = options[:user]
  user = user.to_s if user
  max_running = options[:max_running]
  data = data ? data.dup : {}
  data['type'] = type
  d = compress_data(data.to_json, options[:compression])

  connect {
    begin
      @db[
        "INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?)",
        key, run_at, d, now, user, max_running
      ].insert
      return Task.new(@client, key)
    rescue Sequel::UniqueConstraintViolation
      raise IdempotentAlreadyExistsError, "task key=#{key} already exists"
    end
  }
end

Protected Instance Methods

acquire_with_resource(next_timeout, now, max_acquire) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 442
      def acquire_with_resource(next_timeout, now, max_acquire)
        t0 = nil
        tasks = nil
        sql = <<SQL
SELECT id, timeout, data, created_at, resource, max_running, IFNULL(max_running, 1) / (IFNULL(running, 0) + 1) AS weight
FROM `#{@table}`
LEFT JOIN (
  SELECT resource AS res, COUNT(1) AS running
  FROM `#{@table}` AS T
  WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL
  GROUP BY resource
) AS R ON resource = res
WHERE #{EVENT_HORIZON} < timeout AND timeout <= ?
      AND created_at IS NOT NULL
      AND (max_running-running IS NULL OR max_running-running > 0)
ORDER BY weight DESC, timeout ASC
LIMIT ?
SQL
        connect_locked do
          t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
          tasks = []
          @db.fetch(sql, now, now, max_acquire) do |row|
            attributes = create_attributes(nil, row)
            task_token = Token.new(row[:id])
            task = AcquiredTask.new(@client, row[:id], attributes, task_token)
            tasks.push task
          end
          return nil if tasks.empty?

          sql = "UPDATE `#{@table}` FORCE INDEX (PRIMARY) SET timeout=? WHERE timeout <= ? AND id IN ("
          params = [sql, next_timeout, now]
          params.concat tasks.map(&:key)
          sql << '?,' * tasks.size
          sql.chop!
          sql << ") AND created_at IS NOT NULL"

          n = @db[*params].update
          if n != tasks.size
            # preempted
            return nil
          end
        end
        @cleanup_interval_count -= 1
        return tasks
      ensure
        STDERR.puts "PQ:acquire from #{@table}:%6f sec (%d tasks)" % \
          [Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0, tasks.size] if tasks
      end
acquire_without_resource(next_timeout, now, max_acquire) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 398
      def acquire_without_resource(next_timeout, now, max_acquire)
        # MySQL's CONNECTION_ID() is a 64bit unsigned integer from the
        # server's internal thread ID counter. It is unique while the MySQL
        # server is running.
        # https://bugs.mysql.com/bug.php?id=19806
        #
        # An acquired task is marked with next_timeout and CONNECTION_ID().
        # Therefore while alive_time is not changed and we don't restart
        # the server in 1 second, they won't conflict.
        update_sql = <<SQL
UPDATE `#{@table}`
  JOIN (
  SELECT id
    FROM `#{@table}` FORCE INDEX (`index_#{@table}_on_timeout`)
   WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now
   ORDER BY timeout ASC
      LIMIT :max_acquire) AS t1 USING(id)
   SET timeout=:next_timeout, owner=CONNECTION_ID()
SQL
        select_sql = <<SQL
SELECT id, timeout, data, created_at, resource
  FROM `#{@table}`
 WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
        t0 = 0
        connect_locked do
          t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
          n = @db[update_sql, next_timeout: next_timeout, now: now, max_acquire: max_acquire].update
          @table_unlock.call
          STDERR.puts "PQ:acquire from #{@table}:%6f sec (%d tasks)" % [Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0,n]
          return nil if n <= 0

          tasks = []
          @db.fetch(select_sql, next_timeout) do |row|
            attributes = create_attributes(nil, row)
            task_token = Token.new(row[:id])
            task = AcquiredTask.new(@client, row[:id], attributes, task_token)
            tasks.push task
          end
          @cleanup_interval_count -= 1
          return tasks
        end
      end
connect() { || ... } click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 294
def connect
  now = Time.now.to_i
  tmax = now + @pq_connect_timeout
  @mutex.synchronize do
    # keepalive_timeout
    @db.disconnect if now - @last_time > KEEPALIVE

    count = 0
    begin
      yield
      @last_time = now
    rescue Sequel::DatabaseConnectionError
      if (count += 1) < MAX_RETRY && tmax > Time.now.to_i
        STDERR.puts "#{$!}\n  retrying."
        sleep 2
        retry
      end
      STDERR.puts "#{$!}\n  abort."
      raise
    rescue
      # workaround for "Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction" error
      if $!.to_s.include?('try restarting transaction')
        err = ([$!] + $!.backtrace.map {|bt| "  #{bt}" }).join("\n")
        count += 1
        if count < MAX_RETRY
          STDERR.puts err + "\n  retrying."
          sleep rand
          retry
        else
          STDERR.puts err + "\n  abort."
        end
      else
        err = $!
      end

      STDERR.puts "disconnects current connection: #{err}"
      @db.disconnect

      raise
    ensure
      # connection_pooling
      @db.disconnect if !@use_connection_pooling
    end
  end
end
connect_locked() { || ... } click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 275
def connect_locked
  connect {
    locked = false

    begin
      if @table_lock
        @table_lock.call
        locked = true
      end

      return yield
    ensure
      if @use_connection_pooling && locked
        @table_unlock.call
      end
    end
  }
end
create_attributes(now, row) click to toggle source
# File lib/perfectqueue/backend/rdb_compat.rb, line 342
def create_attributes(now, row)
  compression = nil
  if row[:created_at] === nil
    created_at = nil  # unknown creation time
    status = TaskStatus::FINISHED
  elsif now && row[:timeout] < now
    created_at = row[:created_at]
    status = TaskStatus::WAITING
  else
    created_at = row[:created_at]
    status = TaskStatus::RUNNING
  end

  d = row[:data]
  if d == nil || d == ''
    data = {}

  else
    # automatic gzip decompression
    d.force_encoding('ASCII-8BIT') if d.respond_to?(:force_encoding)
    if d[0, 2] == GZIP_MAGIC_BYTES
      compression = 'gzip'
      gz = Zlib::GzipReader.new(StringIO.new(d))
      begin
        d = gz.read
      ensure
        gz.close
      end
    end

    begin
      data = JSON.parse(d)
    rescue
      data = {}
    end
  end

  type = data.delete('type')
  if type == nil || type.empty?
    type = row[:id].split(/\./, 2)[0]
  end

  {
    :status => status,
    :created_at => created_at,
    :data => data,
    :type => type,
    :user => row[:resource],
    :timeout => row[:timeout],
    :max_running => row[:max_running],
    :message => nil,  # not supported
    :node => nil,  # not supported
    :compression => compression,
  }
end