class Restrainer
Redis backed throttling mechanism to ensure that only a limited number of processes can be executed at any one time.
Usage:
Restrainer.new(:foo, 10).throttle do # Do something end
If more than the specified number of processes as identified by the name argument is currently running, then the throttle block will raise an error.
Constants
- ADD_PROCESS_SCRIPT
Attributes
Public Class Methods
Create a new restrainer. The name is used to identify the Restrainer
and group processes together. You can create any number of Restrainers with different names.
The required limit parameter specifies the maximum number of processes that will be allowed to execute the throttle block at any point in time.
The timeout parameter is used for cleaning up internal data structures so that jobs aren't orphaned if their process is killed. Processes will automatically be removed from the running jobs list after the specified number of seconds. Note that the Restrainer
will not handle timing out any code itself. This value is just used to insure the integrity of internal data structures.
# File lib/restrainer.rb, line 91 def initialize(name, limit:, timeout: 60, redis: nil) @name = name @limit = limit @timeout = timeout @key = "#{self.class.name}.#{name.to_s}" @redis ||= redis end
Either configure the redis instance using a block or yield the instance. Configuring with a block allows you to use things like connection pools etc. without hard coding a single instance.
Example: `Restrainer.redis { redis_pool.instance }`
# File lib/restrainer.rb, line 62 def redis(&block) if block @redis = block elsif defined?(@redis) && @redis @redis.call else raise "#{self.class.name}.redis not configured" end end
Set the redis instance to a specific instance. It is usually preferable to use the block form for configurating the instance so that it can be evaluated at runtime.
Example: `Restrainer.redis = Redis.new`
# File lib/restrainer.rb, line 76 def redis=(conn) @redis = lambda{ conn } end
Public Instance Methods
Clear all locks
# File lib/restrainer.rb, line 148 def clear! redis.del(key) end
Get the number of processes currently being executed for this restrainer.
# File lib/restrainer.rb, line 143 def current redis.zcard(key).to_i end
Obtain a lock on one the allowed processes. The method returns a process identifier that must be passed to the release! to release the lock. You can pass in a unique identifier if you already have one.
Raises a Restrainer::ThrottledError
if the lock cannot be obtained.
The limit argument can be used to override the value set in the constructor.
# File lib/restrainer.rb, line 125 def lock!(process_id = nil, limit: nil) process_id ||= SecureRandom.uuid limit ||= self.limit # limit of less zero is no limit; limit of zero is allow none return nil if limit < 0 raise ThrottledError.new("#{self.class}: #{@name} is not allowing any processing") if limit == 0 add_process!(redis, process_id, limit) process_id end
release one of the allowed processes. You must pass in a process id returned by the lock method.
# File lib/restrainer.rb, line 138 def release!(process_id) remove_process!(redis, process_id) unless process_id.nil? end
Wrap a block with this method to throttle concurrent execution. If more than the alotted number of processes (as identified by the name) are currently executing, then a Restrainer::ThrottledError
will be raised.
The limit argument can be used to override the value set in the constructor.
# File lib/restrainer.rb, line 104 def throttle(limit: nil) limit ||= self.limit # limit of less zero is no limit; limit of zero is allow none return yield if limit < 0 process_id = lock!(limit: limit) begin yield ensure release!(process_id) end end
Private Instance Methods
Add a process to the currently run set.
# File lib/restrainer.rb, line 164 def add_process!(redis, process_id, throttle_limit) process_count = eval_script(redis, process_id, throttle_limit) if process_count >= throttle_limit raise ThrottledError.new("#{self.class}: #{@name} already has #{process_count} processes running") end end
Evaluate and execute a Lua script on the redis server.
# File lib/restrainer.rb, line 177 def eval_script(redis, process_id, throttle_limit) sha1 = @add_process_sha1 if sha1 == nil sha1 = redis.script(:load, ADD_PROCESS_SCRIPT) @add_process_sha1 = sha1 end begin redis.evalsha(sha1, [], [key, process_id, throttle_limit, @timeout, Time.now.to_i]) rescue Redis::CommandError => e if e.message.include?('NOSCRIPT') sha1 = redis.script(:load, ADD_PROCESS_SCRIPT) @add_process_sha1 = sha1 retry else raise e end end end
Hash key in redis to story a sorted set of current processes.
# File lib/restrainer.rb, line 159 def key @key end
# File lib/restrainer.rb, line 154 def redis @redis || self.class.redis end
Remove a process to the currently run set.
# File lib/restrainer.rb, line 172 def remove_process!(redis, process_id) redis.zrem(key, process_id) end