class Adrian::DirectoryQueue
Attributes
available_path[R]
logger[R]
reserved_path[R]
Public Class Methods
create(options = {})
click to toggle source
# File lib/adrian/directory_queue.rb, line 8 def self.create(options = {}) queue = new(options) FileUtils.mkdir_p(queue.available_path) FileUtils.mkdir_p(queue.reserved_path) queue end
new(options = {})
click to toggle source
Note: There is the possibility of an item being consumed by multiple processes when its still in the queue after its lock expires. The reason for allowing this is:
1. It's much simpler than introducing a seperate monitoring process to handle lock expiry. 2. This is an acceptable and rare event. e.g. it only happens when the process working on the item crashes without being able to release the lock
Calls superclass method
Adrian::Queue::new
# File lib/adrian/directory_queue.rb, line 22 def initialize(options = {}) super @available_path = options.fetch(:path) @reserved_path = options.fetch(:reserved_path, default_reserved_path) @logger = options[:logger] filters << Filters::FileLock.new(:duration => options[:lock_duration], :reserved_path => reserved_path) filters << Filters::Delay.new(:duration => options[:delay]) if options[:delay] end
Public Instance Methods
include?(value)
click to toggle source
# File lib/adrian/directory_queue.rb, line 48 def include?(value) item = wrap_item(value) items.include?(item) end
length()
click to toggle source
# File lib/adrian/directory_queue.rb, line 44 def length available_files.count { |file| File.file?(file) } end
pop_item()
click to toggle source
# File lib/adrian/directory_queue.rb, line 31 def pop_item while item = items.shift return item if reserve(item) end end
push_item(value)
click to toggle source
# File lib/adrian/directory_queue.rb, line 37 def push_item(value) item = wrap_item(value) item.move(available_path) item.touch self end
Protected Instance Methods
available_files()
click to toggle source
# File lib/adrian/directory_queue.rb, line 80 def available_files Dir.glob("#{available_path}/*") end
default_reserved_path()
click to toggle source
# File lib/adrian/directory_queue.rb, line 88 def default_reserved_path File.join(@available_path, 'cur') end
files()
click to toggle source
# File lib/adrian/directory_queue.rb, line 76 def files (available_files + reserved_files).select { |file| File.file?(file) } end
items()
click to toggle source
# File lib/adrian/directory_queue.rb, line 69 def items return @items if @items && @items.length > 0 @items = files.map! { |file| wrap_item(file) } @items.reject! { |item| !item.exist? || filter?(item) } @items.sort_by!(&:updated_at) end
reserve(item)
click to toggle source
# File lib/adrian/directory_queue.rb, line 61 def reserve(item) item.move(reserved_path) item.touch true rescue Errno::ENOENT => e false end
reserved_files()
click to toggle source
# File lib/adrian/directory_queue.rb, line 84 def reserved_files Dir.glob("#{reserved_path}/*") end
wrap_item(value)
click to toggle source
# File lib/adrian/directory_queue.rb, line 55 def wrap_item(value) item = value.is_a?(FileItem) ? value : FileItem.new(value) item.logger ||= logger item end