class PerfectQueue::Backend::RDBBackend
Constants
- DELETE_OFFSET
- MAX_RETRY
Attributes
db[R]
Public Class Methods
new(uri, table, config={})
click to toggle source
# File lib/perfectqueue/backend/rdb.rb, line 12 def initialize(uri, table, config={}) @uri = uri @table = table u = URI.parse(@uri) options = { max_connections: 1, user: u.user, password: u.password, host: u.host, port: u.port ? u.port.to_i : 3306 } @pq_connect_timeout = config.fetch(:pq_connect_timeout, 20) options[:connect_timeout] = config.fetch(:connect_timeout, 3) options[:sslca] = config[:sslca] if config[:sslca] options[:ssl_mode] = config[:ssl_mode] if config[:ssl_mode] db_name = u.path.split('/')[1] @db = Sequel.mysql2(db_name, options) @mutex = Mutex.new connect { # connection test } end
Public Instance Methods
cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second))
click to toggle source
# File lib/perfectqueue/backend/rdb.rb, line 52 def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second)) connect { n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update return n > 0 } end
submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource=nil, max_running=nil)
click to toggle source
# File lib/perfectqueue/backend/rdb.rb, line 39 def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource=nil, max_running=nil) connect { begin data = Sequel::SQL::Blob.new(data) @db.sql_log_level = :debug n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert return true rescue Sequel::UniqueConstraintViolation => e return nil end } end
Private Instance Methods
connect() { || ... }
click to toggle source
# File lib/perfectqueue/backend/rdb.rb, line 60 def connect tmax = Process.clock_gettime(Process::CLOCK_REALTIME, :second) + @pq_connect_timeout @mutex.synchronize do retry_count = 0 begin yield rescue Sequel::DatabaseConnectionError if (retry_count += 1) < MAX_RETRY && tmax > Process.clock_gettime(Process::CLOCK_REALTIME, :second) STDERR.puts "#{$!}\n retrying." sleep 2 retry end STDERR.puts "#{$!}\n abort." raise rescue # workaround for "Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction" error if $!.to_s.include?('try restarting transaction') err = $!.backtrace.map{|bt| " #{bt}" }.unshift($!).join("\n") retry_count += 1 if retry_count < MAX_RETRY STDERR.puts "#{err}\n retrying." sleep 0.5 retry end STDERR.puts "#{err}\n abort." end raise ensure @db.disconnect end end end