class Embulk::Output::Wendelin

Public Class Methods

transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/wendelin.rb, line 10
def self.transaction(config, schema, count, &control)
  # configuration code:
  task = {
     "tag" => config.param("tag", :string, default: nil),
    # where Wendelin's Input Stream Tool is located,
    # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
    "streamtool_uri" => config.param("streamtool_uri", :string),

    # credentials to authenticate this fluentd to wendelin
    # by default credentials are not used
    # TODO user/password -> certificate
    "user" => config.param("user", :string, defualt: nil),
    "password" => config.param("password", :string, default: nil),

    "path_prefix" => config.param("path_prefix", :string,  :default => nil),
  }

  # resumable output:
  # resume(task, schema, count, &control)

  #Embulk.logger.info { config.to_s }
  #Embulk.logger.info { schema.to_s }
  #Embulk.logger.info { count.to_s }


  # non-resumable output:
  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/wendelin.rb, line 90
def abort
end
add(page) click to toggle source
# File lib/embulk/output/wendelin.rb, line 66
def add(page)
  # output code:
  #page.each do |record|
    #hash = Hash[schema.names.zip(record)]
  #end
  #Embulk.logger.info { page.to_s }
  page.each do |record|
    #Embulk.logger.info { record.to_s }

    if not @tag.nil?
      tag = @tag
    end
    if not record[1].nil?
      tag = tag + "." + record[1].gsub(File::ALT_SEPARATOR || File::SEPARATOR, '.')[1..-1]
    end

    Embulk.logger.info{ tag }
    @wendelin.ingest(tag, Base64.decode64(record[0]))
  end
end
close() click to toggle source
# File lib/embulk/output/wendelin.rb, line 63
def close
end
commit() click to toggle source
# File lib/embulk/output/wendelin.rb, line 93
def commit
  task_report = {}
  return task_report
end
finish() click to toggle source
# File lib/embulk/output/wendelin.rb, line 87
def finish
end
init() click to toggle source

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

# File lib/embulk/output/wendelin.rb, line 48
def init
  # initialization code:
  credentials = {}
  #Embulk.logger.info { "Test" }
  #Embulk.logger.info { schema.to_s }
  #Embulk.logger.info "Test"
  @tag = task["tag"]
  streamtool_uri = task["streamtool_uri"]
  unless task["user"].nil?
    credentials["user"] = task["user"]
    credentials["password"] = task["password"]
  end
  @wendelin = WendelinClient.new(streamtool_uri, credentials, Embulk.logger)
end