class Embulk::Output::Bigobject
Public Class Methods
column_options_map(column_options)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 28 def self.column_options_map(column_options) (column_options || {}).map do |column_option| [column_option['name'], column_option] end.to_h end
configure(config, schema, count)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 13 def self.configure(config, schema, count) # configuration code: task = { "host" => config.param("host", :string, :default => "localhost"), # string, optional "restport" => config.param("restport", :integer, :default => 9090), # integer, optional "ncport" => config.param("ncport", :integer, :default => 9091), # integer, optional "table" => config.param("table", :string), # string, required "rowcolumn" => config.param("rowcolumn", :string, :default => "ROW"), # string, optional "column_options" => config.param("column_options", :array, :default => []), "payload_column_index" => config.param("payload_column_index", :integer, :default => nil), } task end
create_botable_stmt(tbl,rowcol,schema, cos, is_payload)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 157 def self.create_botable_stmt(tbl,rowcol,schema, cos, is_payload) val_array = Array.new if is_payload cos.each do |co| Embulk.logger.debug {"#{co}"} val_array.push "#{co["name"]} #{co["type"]}" end else schema.each do |c| co = cos[c.index] || {} Embulk.logger.debug {"#{c.index}, #{c.name}, #{co}"} val_array.push "#{co["name"] || c.name} #{to_bigobject_column_type(c.type.to_s, c.format.to_s, co)}" end end bo_table_schema = val_array.join(',') #Embulk.logger.debug {"schema (#{schema.class}): #{schema}"} #Embulk.logger.debug {"schema: #{bo_table_schema}"} keys = Array.new cos.each do |co| keys.push co["name"] if co["is_key"] end if keys.length == 0 "CREATE #{rowcol} TABLE #{tbl} (#{bo_table_schema})" else "CREATE #{rowcol} TABLE #{tbl} (#{bo_table_schema} KEY(#{keys.join(',')}))" end end
new(task, schema, index)
click to toggle source
def self.resume(task, schema, count, &control)
task_reports = yield(task) next_config_diff = {} return next_config_diff
end
Calls superclass method
# File lib/embulk/output/bigobject.rb, line 89 def initialize(task, schema, index) super # Embulk.logger.debug { "Initialize #{index}" } # initialization code: @table = task["table"] @counter = 0 end
rest_exec(uri, stmt)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 139 def self.rest_exec(uri, stmt) begin response = RestClient.post uri, { "Stmt" => "#{stmt}" }.to_json, :content_type => :json, :accept => :json, :timeout => 2 JSON.parse(response.body) rescue RestClient::Exception => e #Embulk.logger.error { "RestClient: #{e.http_code}, #{e.message}, response: #{e.response}" } Embulk.logger.warn { "Timeout: statement: #{stmt}" } begin response = RestClient.post uri, { "Stmt" => "#{stmt}" }.to_json, :content_type => :json, :accept => :json, :timeout => 4 JSON.parse(response.body) rescue RestClient::Exception => e2 Embulk.logger.error { "RestClient: #{e2.http_code}, #{e2.message}, response: #{e2.response}" } end rescue JSON::Exception => e Embulk.logger.error { "JSON: #{e.message}" } end end
to_bigobject_column_type(type, format, co)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 185 def self.to_bigobject_column_type(type, format, co) co = co || {} Embulk.logger.debug {"type: #{type}, format #{format}, option #{co}"} return co["type"] if co["type"] case type when 'long' botype = :INT64 when 'boolean' botype = :INT8 when 'string' botype = :STRING when 'double' botype = :DOUBLE when 'timestamp' if format.include? "%H" botype = :DATETIME32 else botype = :DATE32 end end botype end
transaction(config, schema, count) { |task| ... }
click to toggle source
# File lib/embulk/output/bigobject.rb, line 34 def self.transaction(config, schema, count, &control) task = self.configure(config, schema, count) task['co_map'] = self.column_options_map task["column_options"] task['rest_uri'] = "http://#{task['host']}:#{task['restport']}/cmd".freeze task['ttl_counter'] = 0 Embulk.logger.debug { "Transaction #{count}" } # resumable output: # resume(task, schema, count, &control) # Create-Table if it does not exist response = rest_exec(task['rest_uri'], "desc #{task['table']}") # check table if response["Status"] == 0 then # the table exists Embulk.logger.debug { "#{response}" } elsif response["Status"] == -11 then # the table does not exist response = rest_exec(task['rest_uri'], "#{create_botable_stmt("#{task['table']}", "#{task['rowcolumn']}", schema, task["column_options"], !(task['payload_column_index'].nil?))}") if response["Status"] != 0 then Embulk.logger.error { "#{response}" } raise "Create table #{task['table']} in BigObject Failed" end Embulk.logger.info { "embulk-output-bigobject: Create table #{task['table']}" } else # should not be here Embulk.logger.error { "#{response}" } raise "Please check table #{task['table']} in BigObject First" end # non-resumable output: task_reports = yield(task) next_config_diff = {} return next_config_diff end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/bigobject.rb, line 128 def abort raise "Please Check BigObject" end
add(page)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 101 def add(page) data = Array.new pindex = @task['payload_column_index'] Embulk.logger.debug "#{pindex}" if pindex page.each do |record| data.push "#{record[pindex]}\n" end else page.each do |record| values = [] record.each do |row| values << "\"#{row.to_s.gsub(/\"/,"\"\"")}\"" end data.push "#{values.join(",")}\n" end end safe_io_write "#{data.join}" @counter += data.length @task['ttl_counter'] += data.length end
close()
click to toggle source
# File lib/embulk/output/bigobject.rb, line 98 def close end
commit()
click to toggle source
# File lib/embulk/output/bigobject.rb, line 132 def commit task_report = { "records" => @counter } return task_report end
finish()
click to toggle source
# File lib/embulk/output/bigobject.rb, line 125 def finish end
safe_io_write(buff)
click to toggle source
# File lib/embulk/output/bigobject.rb, line 66 def safe_io_write(buff) @@io ||= create_shared_io @@mutext ||= Mutex.new @@mutext.synchronize do @@io.write buff end end