class Restrainer

Redis backed throttling mechanism to ensure that only a limited number of processes can be executed at any one time.

Usage:

Restrainer.new(:foo, 10).throttle do
  # Do something
end

If more than the specified number of processes as identified by the name argument is currently running, then the throttle block will raise an error.

Constants

ADD_PROCESS_SCRIPT

Attributes

limit[R]
name[R]

Public Class Methods

new(name, limit:, timeout: 60, redis: nil) click to toggle source

Create a new restrainer. The name is used to identify the Restrainer and group processes together. You can create any number of Restrainers with different names.

The required limit parameter specifies the maximum number of processes that will be allowed to execute the throttle block at any point in time.

The timeout parameter is used for cleaning up internal data structures so that jobs aren't orphaned if their process is killed. Processes will automatically be removed from the running jobs list after the specified number of seconds. Note that the Restrainer will not handle timing out any code itself. This value is just used to insure the integrity of internal data structures.

# File lib/restrainer.rb, line 91
def initialize(name, limit:, timeout: 60, redis: nil)
  @name = name
  @limit = limit
  @timeout = timeout
  @key = "#{self.class.name}.#{name.to_s}"
  @redis ||= redis
end
redis(&block) click to toggle source

Either configure the redis instance using a block or yield the instance. Configuring with a block allows you to use things like connection pools etc. without hard coding a single instance.

Example: `Restrainer.redis { redis_pool.instance }`

# File lib/restrainer.rb, line 62
def redis(&block)
  if block
    @redis = block
  elsif defined?(@redis) && @redis
    @redis.call
  else
    raise "#{self.class.name}.redis not configured"
  end
end
redis=(conn) click to toggle source

Set the redis instance to a specific instance. It is usually preferable to use the block form for configurating the instance so that it can be evaluated at runtime.

Example: `Restrainer.redis = Redis.new`

# File lib/restrainer.rb, line 76
def redis=(conn)
  @redis = lambda{ conn }
end

Public Instance Methods

clear!() click to toggle source

Clear all locks

# File lib/restrainer.rb, line 148
def clear!
  redis.del(key)
end
current() click to toggle source

Get the number of processes currently being executed for this restrainer.

# File lib/restrainer.rb, line 143
def current
  redis.zcard(key).to_i
end
lock!(process_id = nil, limit: nil) click to toggle source

Obtain a lock on one the allowed processes. The method returns a process identifier that must be passed to the release! to release the lock. You can pass in a unique identifier if you already have one.

Raises a Restrainer::ThrottledError if the lock cannot be obtained.

The limit argument can be used to override the value set in the constructor.

# File lib/restrainer.rb, line 125
def lock!(process_id = nil, limit: nil)
  process_id ||= SecureRandom.uuid
  limit ||= self.limit

  # limit of less zero is no limit; limit of zero is allow none
  return nil if limit < 0
  raise ThrottledError.new("#{self.class}: #{@name} is not allowing any processing") if limit == 0

  add_process!(redis, process_id, limit)
  process_id
end
release!(process_id) click to toggle source

release one of the allowed processes. You must pass in a process id returned by the lock method.

# File lib/restrainer.rb, line 138
def release!(process_id)
  remove_process!(redis, process_id) unless process_id.nil?
end
throttle(limit: nil) { || ... } click to toggle source

Wrap a block with this method to throttle concurrent execution. If more than the alotted number of processes (as identified by the name) are currently executing, then a Restrainer::ThrottledError will be raised.

The limit argument can be used to override the value set in the constructor.

# File lib/restrainer.rb, line 104
def throttle(limit: nil)
  limit ||= self.limit

  # limit of less zero is no limit; limit of zero is allow none
  return yield if limit < 0

  process_id = lock!(limit: limit)
  begin
    yield
  ensure
    release!(process_id)
  end
end

Private Instance Methods

add_process!(redis, process_id, throttle_limit) click to toggle source

Add a process to the currently run set.

# File lib/restrainer.rb, line 164
def add_process!(redis, process_id, throttle_limit)
  process_count = eval_script(redis, process_id, throttle_limit)
  if process_count >= throttle_limit
    raise ThrottledError.new("#{self.class}: #{@name} already has #{process_count} processes running")
  end
end
eval_script(redis, process_id, throttle_limit) click to toggle source

Evaluate and execute a Lua script on the redis server.

# File lib/restrainer.rb, line 177
def eval_script(redis, process_id, throttle_limit)
  sha1 = @add_process_sha1
  if sha1 == nil
    sha1 = redis.script(:load, ADD_PROCESS_SCRIPT)
    @add_process_sha1 = sha1
  end

  begin
    redis.evalsha(sha1, [], [key, process_id, throttle_limit, @timeout, Time.now.to_i])
  rescue Redis::CommandError => e
    if e.message.include?('NOSCRIPT')
      sha1 = redis.script(:load, ADD_PROCESS_SCRIPT)
      @add_process_sha1 = sha1
      retry
    else
      raise e
    end
  end
end
key() click to toggle source

Hash key in redis to story a sorted set of current processes.

# File lib/restrainer.rb, line 159
def key
  @key
end
redis() click to toggle source
# File lib/restrainer.rb, line 154
def redis
  @redis || self.class.redis
end
remove_process!(redis, process_id) click to toggle source

Remove a process to the currently run set.

# File lib/restrainer.rb, line 172
def remove_process!(redis, process_id)
  redis.zrem(key, process_id)
end