class Fluent::ODPSOutput

Attributes

tables[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 24
def initialize
  super
  require 'time'
  require_relative 'stream_client'
  @compressor = nil
end

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 286
def configure(conf)
  super
  print "configure"
  # You can also refer raw parameter via conf[name].
  @tables = []
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log)
    te.configure(e)
    if e.arg.empty?
      log.warn "no table definition"
    else
      @tables << te
    end
  }
  if @tables.empty?
    raise ConfigError, "There is no <table>. <table> is required"
  end
end
emit(tag, es, chain) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 357
def emit(tag, es, chain)
  super(tag, es, chain, format_tag(tag))
end
format(tag, time, record) click to toggle source

This method is called when an event reaches to Fluentd. Convert the event to a raw string.

# File lib/fluent/plugin/out_odps.rb, line 336
def format(tag, time, record)
  [tag, time, record].to_json + "\n"
end
format_tag(tag) click to toggle source
# File lib/fluent/plugin/out_odps.rb, line 365
def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 327
def shutdown
  super
  @tables.reject! do |te|
    te.close()
  end
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 309
def start
  super
  config = {
      :aliyun_access_id => @aliyun_access_id,
      :aliyun_access_key => @aliyun_access_key,
      :project => @project,
      :aliyun_odps_endpoint => @aliyun_odps_endpoint,
      :aliyun_odps_hub_endpoint => @aliyun_odps_hub_endpoint,
  }
  #初始化各个table object
  @tables.each { |te|
    te.init(config)
  }
  log.info "the table object size is "+@tables.size.to_s
end
write(chunk) click to toggle source

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.

# File lib/fluent/plugin/out_odps.rb, line 347
def write(chunk)
  #foreach tables,choose table oject ,data = chunk.read
  @tables.each { |table|
    if table.pattern.match(chunk.key)
      log.info "Begin to import the data and the table_match is "+chunk.key
      return table.import(chunk)
    end
  }
end