class Fluent::Plugin::LevelDBStorage
Constants
- DEFAULT_DIR_MODE
Attributes
leveldb[R]
store[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_leveldb.rb, line 21 def initialize super @store = {} end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_leveldb.rb, line 27 def configure(conf) super if @path if File.exist?(@path) && File.file?(@path) raise Fluent::ConfigError, "path for <storage> should be a directory." elsif File.exist?(@path) && File.directory?(@path) @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.db") @multi_workers_available = true end elsif root_dir = owner.plugin_root_dir basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.db" : "storage.db" @path = File.join(root_dir, basename) @multi_workers_available = true else raise Fluent::ConfigError, "path for <storage> is required." end dir = File.dirname(@path) FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir) if File.exist?(@path) raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path) else raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable? end @leveldb = LevelDB::DB.new(@path) object = @leveldb.get(@root_key) if object begin data = Yajl::Parser.parse(object) raise Fluent::ConfigError, "Invalid contents (not object) in plugin leveldb storage: '#{@path}'" unless data.is_a?(Hash) rescue => e log.error "failed to read data from plugin redis storage", path: @path, error: e raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin leveldb storage: '#{@path}'" end end end
delete(key)
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 108 def delete(key) @store.delete(key.to_s) end
fetch(key, defval)
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 100 def fetch(key, defval) @store.fetch(key.to_s, defval) end
get(key)
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 96 def get(key) @store[key.to_s] end
load()
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 70 def load begin json_string = @leveldb.get(@root_key) json = Yajl::Parser.parse(json_string) unless json.is_a?(Hash) log.error "broken content for plugin storage (Hash required: ignored)", type: json.class log.debug "broken content", content: json_string return end @store = json rescue => e log.error "failed to load data for plugin storage from leveldb", path: @path, error: e end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 66 def multi_workers_ready? @multi_workers_available end
put(key, value)
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 104 def put(key, value) @store[key.to_s] = value end
save()
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 85 def save begin json_string = Yajl::Encoder.encode(@store) @leveldb.batch do @leveldb.put(@root_key, json_string) end rescue => e log.error "failed to save data for plugin storage to leveldb", path: @path, error: e end end
update(key, &block)
click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 112 def update(key, &block) @store[key.to_s] = block.call(@store[key.to_s]) end