class Embulk::PostgresJsonOutputPlugin

Public Class Methods

connect(task) { |pg| ... } click to toggle source
# File lib/embulk/output/postgres_json.rb, line 51
def self.connect(task)
  url = "jdbc:postgresql://#{task['host']}:#{task['port']}/#{task['database']}"
  props = java.util.Properties.new
  props.put("user", task['username'])
  props.put("password", task['password'])

  pg = org.postgresql.Driver.new.connect(url, props)
  if block_given?
    begin
      yield pg
    ensure
      pg.close
    end
  end
  pg
end
execute_sql(pg, sql, *args) click to toggle source
# File lib/embulk/output/postgres_json.rb, line 68
def self.execute_sql(pg, sql, *args)
  stmt = pg.createStatement
  begin
    stmt.execute(sql)
  ensure
    stmt.close
  end
end
new(task, schema, index) click to toggle source
Calls superclass method
# File lib/embulk/output/postgres_json.rb, line 77
def initialize(task, schema, index)
  super
  @pg = self.class.connect(task)
end
transaction(config, schema, processor_count) { |task| ... } click to toggle source
# File lib/embulk/output/postgres_json.rb, line 8
def self.transaction(config, schema, processor_count, &control)
  task = {
    'host' => config.param('host', :string),
    'port' => config.param('port', :integer, default: 5432),
    'username' => config.param('username', :string),
    'password' => config.param('password', :string, default: ''),
    'database' => config.param('database', :string),
    'table' => config.param('table', :string),
    'column' => config.param('column', :string, default: 'json'),
    'column_type' => config.param('column_type', :string, default: 'json'),
  }

  now = Time.now
  unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec]  # TODO add org.embulk.spi.ExecSession.getTransactionUniqueName() method
  task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}"

  connect(task) do |pg|
    # drop table if exists "DEST"
    # create table if exists "TEMP" ("COL" json)
    execute_sql(pg, %[drop table if exists "#{task['temp_table']}"])
    execute_sql(pg, %[create table "#{task['temp_table']}" ("#{task['column']}" #{task['column_type']})])
  end

  begin
    yield(task)

    connect(task) do |pg|
      # create table if not exists "DEST" ("COL" json)
      # insert into "DEST" ("COL") select "COL" from "TEMP"
      execute_sql(pg, %[create table if not exists "#{task['table']}" ("#{task['column']}" #{task['column_type']})])
      execute_sql(pg, %[insert into "#{task['table']}" ("#{task['column']}") select "#{task['column']}" from "#{task['temp_table']}"])
    end

  ensure
    connect(task) do |pg|
      # drop table if exists TEMP
      execute_sql(pg, %[drop table if exists "#{task['temp_table']}"])
    end
  end

  return {}
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/postgres_json.rb, line 101
def abort
end
add(page) click to toggle source
# File lib/embulk/output/postgres_json.rb, line 86
def add(page)
  prep = @pg.prepareStatement(%[insert into "#{@task['temp_table']}" (#{@task['column']}) values (?::#{@task['column_type']})])
  begin
    page.each do |record|
      prep.setString(1, record.to_json)
      prep.execute
    end
  ensure
    prep.close
  end
end
close() click to toggle source
# File lib/embulk/output/postgres_json.rb, line 82
def close
  @pg.close
end
commit() click to toggle source
# File lib/embulk/output/postgres_json.rb, line 104
def commit
  {}
end
finish() click to toggle source
# File lib/embulk/output/postgres_json.rb, line 98
def finish
end