class OdpsDatahub::StreamWriter

Attributes

mOdpsConfig[R]
mPath[R]
mProject[R]
mRecordList[R]
mShardId[R]
mTable[R]
mUpStream[R]

Public Class Methods

new(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 33
def initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil)
  @mOdpsConfig = odpsConfig
  @mProject = project
  @mTable = table
  @mPath = path
  @mShardId = shardId
  @mSchema = odpsSchema
  reload
end

Public Instance Methods

reload() click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 43
def reload
  @mUpStream = ::StringIO.new
  @mRecordList = Array.new
  @mUpStream.set_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING)
end
write(recordList, partition = "") click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 49
    def write(recordList, partition = "")
      if recordList.is_a?Array
        recordList.each{ |value|
          #handle RecordList
          if value.is_a?OdpsTableRecord
            @mRecordList.push(value)
          #handle ArrayList
          elsif value.is_a?Array and @mSchema != nil and value.size == @mSchema.getColumnCount
            record = convert2Record(value)
            @mRecordList.push(record)
          else
            raise OdpsDatahubException.new($INVALID_ARGUMENT, "write an error type")
          end
        }
      else
        raise OdpsDatahubException.new($INVALID_ARGUMENT, "write param must be a array")
      end

      serializer = Serializer.new
      serializer.serialize(@mUpStream, @mRecordList)

      if @mUpStream.length == 0
        raise OdpsDatahubException.new($INVALID_ARGUMENT, "mRecordList is empty")
      end
      header = Hash.new
      param = Hash.new
      param[$PARAM_CURR_PROJECT] = @mProject
      #TODO partition format
      param[$PARAM_PARTITION] = partition
      param[$PARAM_RECORD_COUNT] = @mRecordList.size.to_s
      header[$CONTENT_ENCODING] = "deflate"
      header[$CONTENT_TYPE] = "application/octet-stream"
=begin version 4
      pack = OdpsDatahub::XStreamPack.new
      pack.pack_data = Zlib::Deflate.deflate(@mUpStream.string)
      pack.pack_meta = ""
      upstream = ::StringIO.new
      pack.serialize_to(upstream)
      header[$CONTENT_MD5] = Digest::MD5.hexdigest(upstream.string)
      header[$CONTENT_LENGTH] = upstream.length.to_s

      conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upstream)
=end
      #version 3
      upStream = Zlib::Deflate.deflate(@mUpStream.string)
      header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream)
      header[$CONTENT_LENGTH] = upStream.length.to_s
      #MAX_LENGTH 2048KB
      if upStream.length > $MAX_PACK_SIZE
        raise OdpsDatahubException.new($PACK_SIZE_EXCEED, "pack size:" + upStream.length.to_s)
      end
      if @mShardId != nil
        conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upStream)
      else
        conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards", "PUT", upStream)
      end

      reload
      res = conn.getResponse
      json_obj = JSON.parse(res.body)
      if res.code != "200"
        raise OdpsDatahubException.new(json_obj["Code"], "write failed because " + json_obj["Message"])
      end
    end

Private Instance Methods

convert2Record(value) click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 115
def convert2Record(value)
  if not value.is_a?Array
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "param for convert2Record must be a array")
  end

  if value.count != @mSchema.getColumnCount
    raise OdpsDatahubException.new($SCHEMA_NOT_MATCH, "column counts are not equal between value and schema")
  end

  record = OdpsTableRecord.new(@mSchema)
  i = 0
  while i < value.count do
    type = @mSchema.getColumnType(i)
    if value[i] == nil
      record.setNullValue(i)
      i += 1
      next
    end
    case type
      when $ODPS_BIGINT
        record.setBigInt(i, value[i])
      when $ODPS_BOOLEAN
        record.setBoolean(i, value[i])
      when $ODPS_DATETIME
        record.setDateTime(i, value[i])
      when $ODPS_DOUBLE
        record.setDouble(i, value[i])
      when $ODPS_STRING
        record.setString(i, value[i])
      else
        raise OdpsDatahubException.new($INVALID_ARGUMENT, "unsupported schema type")
    end
    i += 1
  end
  return record
end