class Adrian::DirectoryQueue

Attributes

available_path[R]
logger[R]
reserved_path[R]

Public Class Methods

create(options = {}) click to toggle source
# File lib/adrian/directory_queue.rb, line 8
def self.create(options = {})
  queue = new(options)
  FileUtils.mkdir_p(queue.available_path)
  FileUtils.mkdir_p(queue.reserved_path)
  queue
end
new(options = {}) click to toggle source

Note: There is the possibility of an item being consumed by multiple processes when its still in the queue after its lock expires. The reason for allowing this is:

1. It's much simpler than introducing a seperate monitoring process to handle lock expiry.
2. This is an acceptable and rare event. e.g. it only happens when the process working on the item crashes without being able to release the lock
Calls superclass method Adrian::Queue::new
# File lib/adrian/directory_queue.rb, line 22
def initialize(options = {})
  super
  @available_path = options.fetch(:path)
  @reserved_path  = options.fetch(:reserved_path, default_reserved_path)
  @logger         = options[:logger]
  filters << Filters::FileLock.new(:duration => options[:lock_duration], :reserved_path => reserved_path)
  filters << Filters::Delay.new(:duration => options[:delay]) if options[:delay]
end

Public Instance Methods

include?(value) click to toggle source
# File lib/adrian/directory_queue.rb, line 48
def include?(value)
  item = wrap_item(value)
  items.include?(item)
end
length() click to toggle source
# File lib/adrian/directory_queue.rb, line 44
def length
  available_files.count { |file| File.file?(file) }
end
pop_item() click to toggle source
# File lib/adrian/directory_queue.rb, line 31
def pop_item
  while item = items.shift
    return item if reserve(item)
  end
end
push_item(value) click to toggle source
# File lib/adrian/directory_queue.rb, line 37
def push_item(value)
  item = wrap_item(value)
  item.move(available_path)
  item.touch
  self
end

Protected Instance Methods

available_files() click to toggle source
# File lib/adrian/directory_queue.rb, line 80
def available_files
  Dir.glob("#{available_path}/*")
end
default_reserved_path() click to toggle source
# File lib/adrian/directory_queue.rb, line 88
def default_reserved_path
  File.join(@available_path, 'cur')
end
files() click to toggle source
# File lib/adrian/directory_queue.rb, line 76
def files
  (available_files + reserved_files).select { |file| File.file?(file) }
end
items() click to toggle source
# File lib/adrian/directory_queue.rb, line 69
def items
  return @items if @items && @items.length > 0
  @items = files.map! { |file| wrap_item(file) }
  @items.reject! { |item| !item.exist? || filter?(item) }
  @items.sort_by!(&:updated_at)
end
reserve(item) click to toggle source
# File lib/adrian/directory_queue.rb, line 61
def reserve(item)
  item.move(reserved_path)
  item.touch
  true
rescue Errno::ENOENT => e
  false
end
reserved_files() click to toggle source
# File lib/adrian/directory_queue.rb, line 84
def reserved_files
  Dir.glob("#{reserved_path}/*")
end
wrap_item(value) click to toggle source
# File lib/adrian/directory_queue.rb, line 55
def wrap_item(value)
  item = value.is_a?(FileItem) ? value : FileItem.new(value)
  item.logger ||= logger
  item
end