class Embulk::Output::Bigobject

Public Class Methods

column_options_map(column_options) click to toggle source
# File lib/embulk/output/bigobject.rb, line 28
def self.column_options_map(column_options) 
      (column_options || {}).map do |column_option| 
        [column_option['name'], column_option] 
      end.to_h 
end
configure(config, schema, count) click to toggle source
# File lib/embulk/output/bigobject.rb, line 13
def self.configure(config, schema, count)
  # configuration code:
  task = {
    "host"           => config.param("host",           :string,  :default => "localhost"), # string, optional
    "restport"       => config.param("restport",       :integer, :default => 9090),  # integer, optional
    "ncport"         => config.param("ncport",         :integer, :default => 9091),  # integer, optional
    "table"          => config.param("table",          :string),        # string, required
            "rowcolumn"    => config.param("rowcolumn",      :string,  :default => "ROW"), # string, optional
            "column_options" => config.param("column_options", :array,   :default => []),   
    "payload_column_index" => config.param("payload_column_index", :integer,  :default => nil),   
  }
  
          task
    end
create_botable_stmt(tbl,rowcol,schema, cos, is_payload) click to toggle source
# File lib/embulk/output/bigobject.rb, line 157
def self.create_botable_stmt(tbl,rowcol,schema, cos, is_payload)
          val_array = Array.new
          if is_payload
            cos.each do |co|
              Embulk.logger.debug {"#{co}"}
              val_array.push "#{co["name"]} #{co["type"]}" 
            end
          else
            schema.each do |c|
              co = cos[c.index] || {}
              Embulk.logger.debug {"#{c.index}, #{c.name}, #{co}"}
              val_array.push "#{co["name"] || c.name} #{to_bigobject_column_type(c.type.to_s, c.format.to_s, co)}" 
            end
          end
      bo_table_schema = val_array.join(',')
          #Embulk.logger.debug {"schema (#{schema.class}): #{schema}"}
          #Embulk.logger.debug {"schema: #{bo_table_schema}"}
          keys = Array.new
          cos.each do |co|
            keys.push co["name"] if co["is_key"] 
          end
          if keys.length == 0
    "CREATE #{rowcol} TABLE #{tbl} (#{bo_table_schema})"
          else
    "CREATE #{rowcol} TABLE #{tbl} (#{bo_table_schema} KEY(#{keys.join(',')}))"
          end
end
new(task, schema, index) click to toggle source

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

Calls superclass method
# File lib/embulk/output/bigobject.rb, line 89
def initialize(task, schema, index)
          super

  # Embulk.logger.debug { "Initialize #{index}" }
  # initialization code:
  @table = task["table"]
  @counter = 0 
end
rest_exec(uri, stmt) click to toggle source
# File lib/embulk/output/bigobject.rb, line 139
def self.rest_exec(uri, stmt)
  begin
    response = RestClient.post uri, { "Stmt" => "#{stmt}" }.to_json, :content_type => :json, :accept => :json, :timeout => 2
    JSON.parse(response.body)
  rescue RestClient::Exception => e
    #Embulk.logger.error { "RestClient: #{e.http_code}, #{e.message}, response: #{e.response}" }
    Embulk.logger.warn { "Timeout: statement: #{stmt}" }
    begin
      response = RestClient.post uri, { "Stmt" => "#{stmt}" }.to_json, :content_type => :json, :accept => :json, :timeout => 4
      JSON.parse(response.body)
    rescue RestClient::Exception => e2
      Embulk.logger.error { "RestClient: #{e2.http_code}, #{e2.message}, response: #{e2.response}" }
    end
      rescue JSON::Exception => e
    Embulk.logger.error { "JSON: #{e.message}" }
  end
end
to_bigobject_column_type(type, format, co) click to toggle source
# File lib/embulk/output/bigobject.rb, line 185
def self.to_bigobject_column_type(type, format, co)
          co = co || {}
          Embulk.logger.debug {"type: #{type}, format #{format}, option #{co}"}
          return co["type"] if co["type"]

  case type
  when 'long'
    botype = :INT64
  when 'boolean'
    botype = :INT8
  when 'string'
    botype = :STRING
  when 'double'
    botype = :DOUBLE
  when 'timestamp'
    if format.include? "%H" 
      botype = :DATETIME32
    else
      botype = :DATE32
    end
  end
  botype
end
transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/bigobject.rb, line 34
def self.transaction(config, schema, count, &control)
          task = self.configure(config, schema, count)
          task['co_map'] = self.column_options_map task["column_options"]
  task['rest_uri'] = "http://#{task['host']}:#{task['restport']}/cmd".freeze
          task['ttl_counter'] = 0
  
  Embulk.logger.debug { "Transaction #{count}" }
  # resumable output:
  # resume(task, schema, count, &control)

  # Create-Table if it does not exist
  response = rest_exec(task['rest_uri'], "desc #{task['table']}") # check table
  if response["Status"] == 0 then # the table exists
    Embulk.logger.debug { "#{response}" }
  elsif response["Status"] == -11 then # the table does not exist
    response = rest_exec(task['rest_uri'], "#{create_botable_stmt("#{task['table']}", "#{task['rowcolumn']}", schema, task["column_options"], !(task['payload_column_index'].nil?))}")
    if response["Status"] != 0 then 
      Embulk.logger.error { "#{response}" }
      raise "Create table #{task['table']} in BigObject Failed"
    end
    Embulk.logger.info { "embulk-output-bigobject: Create table #{task['table']}" }
  else # should not be here
    Embulk.logger.error { "#{response}" }
    raise "Please check table #{task['table']} in BigObject First"  
  end

  # non-resumable output:
  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/bigobject.rb, line 128
def abort
  raise "Please Check BigObject"
end
add(page) click to toggle source
# File lib/embulk/output/bigobject.rb, line 101
def add(page)
  data = Array.new
          pindex = @task['payload_column_index']
          Embulk.logger.debug "#{pindex}"

          if pindex
            page.each do |record|
              data.push "#{record[pindex]}\n"
            end
          else
    page.each do |record|
      values = []
      record.each do |row| values << "\"#{row.to_s.gsub(/\"/,"\"\"")}\"" end
              data.push "#{values.join(",")}\n"
    end
          end

          safe_io_write "#{data.join}"

  @counter += data.length
  @task['ttl_counter'] += data.length

end
close() click to toggle source
# File lib/embulk/output/bigobject.rb, line 98
def close
end
commit() click to toggle source
# File lib/embulk/output/bigobject.rb, line 132
def commit
  task_report = {
    "records" => @counter
  }
  return task_report
end
create_shared_io() click to toggle source
# File lib/embulk/output/bigobject.rb, line 74
def create_shared_io
      io = TCPSocket.new @task['host'], @task['ncport']
      #io = File.new "out.dump", "w"
      io.write "csv\x01"
      io.puts @task['table']
      io
end
finish() click to toggle source
# File lib/embulk/output/bigobject.rb, line 125
def finish
end
safe_io_write(buff) click to toggle source
# File lib/embulk/output/bigobject.rb, line 66
  def safe_io_write(buff)
        @@io ||= create_shared_io
        @@mutext ||= Mutex.new
@@mutext.synchronize do 
          @@io.write buff
        end
  end