class OdpsDatahub::StreamWriter
Attributes
mOdpsConfig[R]
mPath[R]
mProject[R]
mRecordList[R]
mShardId[R]
mTable[R]
mUpStream[R]
Public Class Methods
new(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil)
click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 33 def initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) @mOdpsConfig = odpsConfig @mProject = project @mTable = table @mPath = path @mShardId = shardId @mSchema = odpsSchema reload end
Public Instance Methods
reload()
click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 43 def reload @mUpStream = ::StringIO.new @mRecordList = Array.new @mUpStream.set_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING) end
write(recordList, partition = "")
click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 49 def write(recordList, partition = "") if recordList.is_a?Array recordList.each{ |value| #handle RecordList if value.is_a?OdpsTableRecord @mRecordList.push(value) #handle ArrayList elsif value.is_a?Array and @mSchema != nil and value.size == @mSchema.getColumnCount record = convert2Record(value) @mRecordList.push(record) else raise OdpsDatahubException.new($INVALID_ARGUMENT, "write an error type") end } else raise OdpsDatahubException.new($INVALID_ARGUMENT, "write param must be a array") end serializer = Serializer.new serializer.serialize(@mUpStream, @mRecordList) if @mUpStream.length == 0 raise OdpsDatahubException.new($INVALID_ARGUMENT, "mRecordList is empty") end header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject #TODO partition format param[$PARAM_PARTITION] = partition param[$PARAM_RECORD_COUNT] = @mRecordList.size.to_s header[$CONTENT_ENCODING] = "deflate" header[$CONTENT_TYPE] = "application/octet-stream" =begin version 4 pack = OdpsDatahub::XStreamPack.new pack.pack_data = Zlib::Deflate.deflate(@mUpStream.string) pack.pack_meta = "" upstream = ::StringIO.new pack.serialize_to(upstream) header[$CONTENT_MD5] = Digest::MD5.hexdigest(upstream.string) header[$CONTENT_LENGTH] = upstream.length.to_s conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upstream) =end #version 3 upStream = Zlib::Deflate.deflate(@mUpStream.string) header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream) header[$CONTENT_LENGTH] = upStream.length.to_s #MAX_LENGTH 2048KB if upStream.length > $MAX_PACK_SIZE raise OdpsDatahubException.new($PACK_SIZE_EXCEED, "pack size:" + upStream.length.to_s) end if @mShardId != nil conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upStream) else conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards", "PUT", upStream) end reload res = conn.getResponse json_obj = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(json_obj["Code"], "write failed because " + json_obj["Message"]) end end
Private Instance Methods
convert2Record(value)
click to toggle source
# File lib/fluent/plugin/stream_writer.rb, line 115 def convert2Record(value) if not value.is_a?Array raise OdpsDatahubException.new($INVALID_ARGUMENT, "param for convert2Record must be a array") end if value.count != @mSchema.getColumnCount raise OdpsDatahubException.new($SCHEMA_NOT_MATCH, "column counts are not equal between value and schema") end record = OdpsTableRecord.new(@mSchema) i = 0 while i < value.count do type = @mSchema.getColumnType(i) if value[i] == nil record.setNullValue(i) i += 1 next end case type when $ODPS_BIGINT record.setBigInt(i, value[i]) when $ODPS_BOOLEAN record.setBoolean(i, value[i]) when $ODPS_DATETIME record.setDateTime(i, value[i]) when $ODPS_DOUBLE record.setDouble(i, value[i]) when $ODPS_STRING record.setString(i, value[i]) else raise OdpsDatahubException.new($INVALID_ARGUMENT, "unsupported schema type") end i += 1 end return record end