class Embulk::Output::Sqlite3OutputPlugin

Public Class Methods

connect(task) { |sqlite| ... } click to toggle source
# File lib/embulk/output/sqlite3.rb, line 48
def self.connect(task)
  url = "jdbc:sqlite:#{task['database']}"
  sqlite = org.sqlite.JDBC.new.connect(url, java.util.Properties.new)
  if block_given?
    begin
      yield sqlite
    ensure
      sqlite.close
    end
  end
  sqlite
end
execute_sql(sqlite, sql, *args) click to toggle source
# File lib/embulk/output/sqlite3.rb, line 61
def self.execute_sql(sqlite, sql, *args)
  stmt = sqlite.createStatement
  begin
    stmt.execute(sql)
  ensure
    stmt.close
  end
end
to_sqlite_column_type(type) click to toggle source
# File lib/embulk/output/sqlite3.rb, line 33
def self.to_sqlite_column_type(type)
  case type
  when 'long' then
    'integer'
  when 'string' then
    'text'
  when 'timestamp' then
    'text'
  when 'double' then
    'real'
  else
    type
  end
end
to_sqlite_schema(schema) click to toggle source
# File lib/embulk/output/sqlite3.rb, line 29
def self.to_sqlite_schema(schema)
  schema.map {|column| "`#{column.name}` #{to_sqlite_column_type(column.type.to_s)}" }.join(',')
end
transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/sqlite3.rb, line 9
def self.transaction(config, schema, count, &control)
  columns = schema.map {|c| "`#{c.name}`" }
  column_types = schema.map {|c| "#{to_sqlite_column_type(c.type.to_s)}" }

  task = {
    'database' => config.param('database', :string),
    'table' => config.param('table', :string),
    'columns' => columns,
    'column_types' => column_types,
  }

  connect(task) do |sqlite|
    execute_sql(sqlite, %[create table if not exists #{task['table']}(#{to_sqlite_schema(schema)})])
  end

  commit_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/sqlite3.rb, line 110
def abort
end
add(page) click to toggle source
# File lib/embulk/output/sqlite3.rb, line 79
def add(page)
  prep = @sqlite.prepareStatement(%[insert into #{@task['table']}(#{@task['columns'].join(',')}) values (#{@task['columns'].map{|c| '?' }.join(',')})])
  begin
    page.each do |record|

      @task['column_types'].each_with_index do |type, index|
        case type
        when 'integer' then
          prep.setInt(index+1, record[index])
        when 'string' then
          prep.setString(index+1, record[index])
        when 'timestamp' then
          prep.setString(index+1, record[index].to_s)
        when 'double' then
          prep.setString(index+1, record[index].to_f)
        else
          prep.setString(index+1, record[index].to_s)
        end
      end

      prep.execute
      @records += 1
    end
  ensure
    prep.close
  end
end
close() click to toggle source
# File lib/embulk/output/sqlite3.rb, line 75
def close
  @sqlite.close
end
commit() click to toggle source
# File lib/embulk/output/sqlite3.rb, line 113
def commit
  commit_report = {
    "records" => @records
  }
  return commit_report
end
finish() click to toggle source
# File lib/embulk/output/sqlite3.rb, line 107
def finish
end
init() click to toggle source
# File lib/embulk/output/sqlite3.rb, line 70
def init
  @sqlite = self.class.connect(task)
  @records = 0
end