class Embulk::Input::SoracomHarvest::Plugin

Constants

END_POINT_URL_DEFAULT
PREVIEW_COUNT
RETRY_INITIAL_WAIT_SEC_DEFAULT
RETRY_LIMIT_DEFAULT
TAG_VALUE_MATCH_MODE_DEFAULT

Attributes

end_datetime[R]
filter[R]
last_record[R]
start_datetime[R]

Public Class Methods

embulk_columns(config) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 211
def self.embulk_columns(config)
  config.param(:columns, :array).map do |column|
    name = column['name']
    type = column['type'].to_sym

    Column.new(nil, name, type, column['format'])
  end
end
get_harvest_schema(record) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 159
def self.get_harvest_schema(record)
  content_type = record['contentType']
  type = content_type == 'application/json' ? 'json' : 'string'
  [
    {name: 'content', type: type},
    {name: 'contentType', type: 'string'},
    {name: 'time', type: 'timestamp'},
  ]
end
get_sim_schema(sim) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 138
def self.get_sim_schema(sim)
  columns = []
  sim.each do |k, v|
    type =
      case k
        when 'plan'
          'long'
        when 'createdAt', 'lastModifiedAt', 'expiredAt', 'expiryTime', 'createdTime', 'lastModifiedTime'
          'timestamp'
        when 'imeiLock', 'terminationEnabled'
          'boolean'
        when 'tags', 'sessionStatus'
          'json'
        else
          'string'
      end
    columns << {name: k, type: type}
  end
  columns
end
guess(config) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 67
def self.guess(config)
  auth_key_id = config.param(:auth_key_id, :string)
  auth_key = config.param(:auth_key, :string)
  target = config.param(:target, :string)
  tag_value_match_mode = config.param(:tag_value_match_mode, :string, default: TAG_VALUE_MATCH_MODE_DEFAULT)

  retry_limit = config.param(:retry_limit, :integer, default: RETRY_LIMIT_DEFAULT)
  retry_initial_wait_sec = config.param(:retry_initial_wait_sec, :integer, default: RETRY_INITIAL_WAIT_SEC_DEFAULT)

  options = {
    endpoint: config.param(:endpoint, :string, default:END_POINT_URL_DEFAULT),
    retry_limit: retry_limit,
    retry_initial_wait_sec: retry_initial_wait_sec,
  }
  client = SoracomClient.new(auth_key_id, auth_key, options)

  # TODO last_record
  sims = client.list_subscribers(filter: @filter, limit: 1, last_record: nil, tag_value_match_mode: tag_value_match_mode)
  raise ConfigError.new "Failed to guess. No registered SIM found" if sims.size == 0

  Embulk::logger.info "Getting schema for target: '#{target}'"
  if target == 'sims'
    columns = self.get_sim_schema(sims.first)
  else
    records = client.list_subscribers_imsi_data(imsi: sims.first['imsi'], from: @start_datetime, to: @end_datetime, limit: 1)
    raise ConfigError.new "Failed to guess. No records found at Soracom Harvest" if records.size == 0
    columns = self.get_harvest_schema(records.first)
  end

  {
    'columns' => columns
  }
end
resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 42
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = task_reports.first
  return next_config_diff
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 18
def self.transaction(config, &control)
  # configuration code:
  task = {
    'auth_key' => config.param('auth_key', :string),
    'auth_key_id' => config.param('auth_key_id', :string),
    'target' => config.param('target', :string, default: 'harvest'),
    # TODO
    'incremental' => config.param('incremental', :bool, default: false),
    'start_datetime' => config.param('start_datetime', :string, default: nil),
    'end_datetime' => config.param('end_datetime', :string, default: nil),
    #'last_record' => config.param("last_record", :string, default: nil),
    'endpoint' => config.param('endpoint', :string, default: END_POINT_URL_DEFAULT),
    'filter' => config.param('filter', :string, default: nil),
    'tag_value_match_mode' => config.param('tag_value_match_mode', :string, default: TAG_VALUE_MATCH_MODE_DEFAULT),
    'retry_limit' => config.param('retry_limit', :integer, default: RETRY_LIMIT_DEFAULT),
    'retry_initial_wait_sec' => config.param('retry_initial_wait_sec', :integer, default: RETRY_INITIAL_WAIT_SEC_DEFAULT),
    'columns' => config.param('columns', :array),
  }

  columns = embulk_columns(config)

  resume(task, columns, 1, &control)
end

Public Instance Methods

cast_value(column, value, is_harvest) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 177
def cast_value(column, value, is_harvest)
  return if value.to_s.empty? # nil or empty string

  case column['type'].to_s
    when 'timestamp'
      begin
        Time.at(value / 1000.0).round(3)
      rescue
        raise DataError.new "Can't parse as Time '#{value}' (column is #{column['name']})"
      end
    when 'json'
      if is_harvest
        begin
          JSON.parse(value)
        rescue
          raise DataError.new "Can't parse as JSON '#{value}' (column is #{column['name']})"
        end
      else
        value.to_json
      end
    else
      value
  end
end
convert_to_unixtimestamp(time) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 239
def convert_to_unixtimestamp(time)
  begin
    v = Time.parse(time)
    v.to_i * 1000 + v.usec/1000
  rescue
    raise ConfigError.new "Failed to convert ['#{time}'] to UNIX timestamp"
  end
end
convert_unixtime_to_date(unixtime) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 234
def convert_unixtime_to_date(unixtime)
  return nil if unixtime.nil?
  Time.at(unixtime / 1000.0).strftime('%Y-%m-%d %H:%M:%S.%3N %z')
end
format_record(record, columns, is_harvest) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 169
def format_record(record, columns, is_harvest)
  values = columns.map do |column|
    name = column['name'].to_s
    value = record[name]
    cast_value(column, value, is_harvest)
  end
end
get_request_options(task) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 220
def get_request_options(task)
  {
    endpoint: task[:endpoint],
    retry_limit: task[:retry_limit],
    retry_initial_wait_sec: task[:retry_initial_wait_sec],
  }
end
init() click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 49
def init
  if task['start_datetime']
    raise ConfigError.new "'start_datetime' can't be used when 'target: sims'" if task['target'] == 'sims'
    @start_datetime = convert_to_unixtimestamp(task['start_datetime'])
  end

  if task['end_datetime']
    raise ConfigError.new "'end_datetime' can't be used when 'target: sims'" if task['target'] == 'sims'
    @end_datetime = convert_to_unixtimestamp(task['end_datetime'])
  end

  # if task['last_record']
  #   @last_record = convert_to_unixtimestamp(task['last_record'])
  # end

  @filter = to_hash(task['filter'])
end
preview?() click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 202
def preview?
  begin
    # http://www.embulk.org/docs/release/release-0.6.12.html
    org.embulk.spi.Exec.isPreview()
  rescue java.lang.NullPointerException => e
    false
  end
end
run() click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 101
def run
  client = SoracomClient.new(task['auth_key_id'], task['auth_key'], get_request_options(task))

  # TODO last_record
  sims = client.list_subscribers(filter: @filter, last_record: nil, tag_value_match_mode: task['tag_value_match_mode'])

  if sims.size > 0
    columns = task['columns']

    counter = 0
    last_record = nil
    sims.each do |sim|
      if task['target'] == 'sims'
        page_builder.add(format_record(sim, columns, false))
      else
        # TODO last_record
        records = client.list_subscribers_imsi_data(imsi: sim['imsi'], from: @start_datetime, to: @end_datetime, last_record: @last_record)
        if records.size > 0
          records.each do |record|
            page_builder.add(format_record(record, columns, true))
            last_record = record['time']
          end
        end
      end
      break if preview? && (counter += 1) >= PREVIEW_COUNT
    end
  end

  page_builder.finish

  return {} unless task[:incremental]

  task_report = {
    last_record: convert_unixtime_to_date(last_record)
  }
end
to_hash(str) click to toggle source
# File lib/embulk/input/soracom_harvest/plugin.rb, line 228
def to_hash(str)
  return nil if str.nil?
  array = str.delete(' ').split(/[:,]/)
  array.each_slice(2).map {|k, v| [k.to_sym, v] }.to_h
end