class Embulk::Output::Redis
Public Class Methods
transaction(config, schema, count) { |task| ... }
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 11 def self.transaction(config, schema, count, &control) # configuration code: task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 6379), 'db' => config.param('db', :integer, :default => 0), 'key_prefix' => config.param('key_prefix', :string, :default => ''), 'encode' => config.param('encode', :string, :default => 'json') } # resumable output: # resume(task, schema, count, &control) # non-resumable output: task_reports = yield(task) puts "Redis output finished. Commit reports = #{task_reports.to_json}" next_config_diff = {} return next_config_diff end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 92 def abort end
add(page)
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 52 def add(page) # output code: page.each do |records| puts "Schema: #{schema.names}" # puts "Record: #{records}" records.each do |record| hash = JSON.parse(record) k = nil v = hash.select{|key,v| k = key key.match(/^#{task['key_prefix']}/) } puts "KEY: #{k}" @processed_keys << k unless @unique_keys.include? k case task['encode'] when 'json' v = v[k].to_json @redis.set(k, v) when 'hash' v = v[k] puts "VALUE: #{v}" puts "FLATTEN: #{v.to_a.flatten}" @redis.hmset(k, v.to_a.flatten) end @unique_keys << k else puts "Warning: #{k} is already exists" end @rows += 1 # inrement anyway end end end
close()
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 49 def close end
commit()
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 95 def commit task_report = { "rows" => @rows, "processed_keys" => @processed_keys.inspect, "unique_keys" => @unique_keys.inspect } return task_report end
finish()
click to toggle source
# File lib/embulk/output/rediskeys.rb, line 89 def finish end
init()
click to toggle source
def self.resume(task, schema, count, &control)
task_reports = yield(task) next_config_diff = {} return next_config_diff
end
Calls superclass method
# File lib/embulk/output/rediskeys.rb, line 39 def init # initialization code: puts "Redis output thread #{index}..." super @rows = 0 @processed_keys = [].to_set @unique_keys = [].to_set @redis = ::Redis.new(:host => task['host'], :port => task['port'], :db => task['db']) end