class Fluent::ODPSOutput::TableElement

TODO: Merge SQLInput’s TableElement

Attributes

client[R]
log[R]
partitionList[RW]
pattern[R]
writer[R]

Public Class Methods

new(pattern, log) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 63
def initialize(pattern, log)
  super()
  @pattern = MatchPattern.create(pattern)
  @log = log
  @writer = Array.new
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/out_odps.rb, line 277
def close()
  @client.loadShard(0)
end
configure(conf) click to toggle source

初始化数据

Calls superclass method
# File lib/fluent/plugin/out_odps.rb, line 71
def configure(conf)
  super
  @format_proc = Proc.new { |record|
    values = []
    @fields.split(',').each { |key|
      unless record.has_key?(key)
        @log.warn "the table  "+@table+"'s "+key+" field not has match key"
      end
      values << record[key]
    }
    values
  }
end
import(chunk) click to toggle source

import data

# File lib/fluent/plugin/out_odps.rb, line 137
def import(chunk)
  records = []
  partitions=Hash.new
  chunk.msgpack_each { |tag, time, data|
    begin
      #if partition is not empty
      unless @partition.blank? then
        #if partition has params in it
        if @partition.include? "=${"
          #split partition
          partition_arrays=@partition.split(',')
          partition_name=''
          i=1
          for p in partition_arrays do
            #if partition is time formated
            if p.include? "strftime"
              key=p[p.index("{")+1, p.index(".strftime")-1-p.index("{")]
              partition_column=p[0, p.index("=")]
              timeFormat=p[p.index("(")+2, p.index(")")-3-p.index("(")]
              if data.has_key?(key)
                if time_format == nil
                  partition_value=Time.parse(data[key]).strftime(timeFormat)
                else
                  partition_value=Time.strptime(data[key], time_format).strftime(timeFormat)
                end
                if i==1
                  partition_name+=partition_column+"="+partition_value
                else
                  partition_name+=","+partition_column+"="+partition_value
                end
              else
                raise "partition has no corresponding source key or the partition expression is wrong,"+data
              end
            else
              key=p[p.index("{")+1, p.index("}")-1-p.index("{")]
              partition_column=p[0, p.index("=")]
              if data.has_key?(key)
                partition_value=data[key]
                if i==1
                  partition_name+=partition_column+"="+partition_value
                else
                  partition_name+=","+partition_column+"="+partition_value
                end
              else
                raise "partition has no corresponding source key or the partition expression is wrong,"+data
              end
            end
            i+=1
          end
        else
          partition_name=@partition
        end
        if partitions[partition_name]==nil
          partitions[partition_name]=[]
        end
        partitions[partition_name] << @format_proc.call(data)

      else
        records << @format_proc.call(data)
      end

    rescue => e
      raise "Failed to format the data:"+e.message
    end
  }

  begin
    #multi thread
    sendThread = Array.new
    unless @partition.blank? then
      partitions.each { |k, v|
        @log.info k
        #if the partition is not exist, create one
        unless @partitionList.include?(k)
          @client.addPartition(k)
          @partitionList << k
          @log.info "add partition "+k
        end
      }
      for thread in 0..@thread_number-1
        sendThread[thread] = Thread.start(thread) do |threadId|
          retryTime = 0
          begin
            partitions.each { |k, v|
              sendCount = v.size/@thread_number
              restCount = 0
              if threadId == @thread_number-1
                restCount = v.size%@thread_number
              end
              @writer[threadId].write(v[sendCount*threadId..sendCount*(threadId+1)+restCount-1], k)
              @log.info "Successfully  import "+(sendCount+restCount).to_s+" data to partition:"+k+",table:"+@table+" at threadId:"+threadId.to_s
            }
          rescue => e
            if retryTime > 0
              @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message
              sleep(2)
              retryTime -= 1
              retry
            else
              raise e
            end
          end
        end
      end
    else
      @log.info records.size.to_s+" records to be sent"
      for thread in 0..@thread_number-1
        sendThread[thread] = Thread.start(thread) do |threadId|
          retryTime = 0
          #send data from sendCount*threadId to sendCount*(threadId+1)-1
          sendCount = records.size/@thread_number
          restCount = 0
          if threadId == @thread_number-1
            restCount = records.size%@thread_number
          end
          begin
            @writer[threadId].write(records[sendCount*threadId..sendCount*(threadId+1)+restCount-1])
            @log.info "Successfully import "+(sendCount+restCount).to_s+" data to table:"+@table+" at threadId:"+threadId.to_s
          rescue => e
            if retryTime > 0
              @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message
              sleep(2)
              retryTime -= 1
              retry
            else
              raise e 
            end
          end
        end
      end
    end
    for thread in 0..@thread_number-1
      sendThread[thread].join
    end
  rescue => e
    # ignore other exceptions to use Fluentd retry
    raise "write records failed,"+e.message
  end
end
init(config) click to toggle source
# File lib/fluent/plugin/out_odps.rb, line 85
def init(config)
  odpsConfig = OdpsDatahub::OdpsConfig.new(config[:aliyun_access_id],
                                           config[:aliyun_access_key],
                                           config[:aliyun_odps_endpoint],
                                           config[:aliyun_odps_hub_endpoint],
                                           config[:project])
  if @record_batch_size<=0
    raise "the table "+ @table+"'s record_batch_size is must more than 0"
  end
  begin
    @client = OdpsDatahub::StreamClient.new(odpsConfig, config[:project], @table)
    @client.loadShard(@shard_number)
    allLoaded = false
    loadtime=0
    while !allLoaded do
      count = 0
      #get json like [{"ShardId": "0","State": "loaded"},{"ShardId": "1","State": "loaded"}]
      @client.getShardStatus.each { |shard|
        if shard["State"] != "loaded"
          sleep(5)
          loadtime+=5
          break
        else
          count += 1
        end
        if count == @shard_number
          allLoaded = true
          @log.info "All shareds are loaded successfully"
        end
        if loadtime>=300
          raise "Load shared timeout"
        end
      }
    end
    for i in 0..@thread_number-1
      @writer[i] = @client.createStreamArrayWriter()
    end
    partitionMaps=@client.getPartitionList
    @partitionList=[]
    for map in partitionMaps do
      partitionName=''
      map.each { |k, v|
        partitionName+=k+"="+v+","
      }
      @partitionList<<partitionName.chomp(",")
    end
  rescue => e
    raise "loadShard failed,"+e.message
  end
end