class OdpsDatahub::Serializer
Public Instance Methods
encodeBool(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 32 def encodeBool(value) [value ? 1 : 0].pack('C') end
encodeDataTime(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 59 def encodeDataTime(value) self.encodeSInt64(value) end
encodeDouble(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 36 def encodeDouble(value) [value].pack('E') end
encodeFixed32(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 76 def encodeFixed32(value) [value].pack('V') end
encodeFixed64(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 71 def encodeFixed64(value) # we don't use 'Q' for pack/unpack. 'Q' is machine-dependent. [value & 0xffff_ffff, value >> 32].pack('VV') end
encodeFixedString(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 80 def encodeFixedString(value) [value].pack('V') end
encodeSInt64(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 40 def encodeSInt64(value) if value >= 0 ::Protobuf::Field::VarintField.encode(value << 1) else ::Protobuf::Field::VarintField.encode(~(value << 1)) end end
encodeString(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 63 def encodeString(value) value_to_encode = value.dup value_to_encode.encode!(::Protobuf::Field::StringField::ENCODING, :invalid => :replace, :undef => :replace, :replace => "") value_to_encode.force_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING) string_bytes = ::Protobuf::Field::VarintField.encode(value_to_encode.size) string_bytes << value_to_encode end
encodeUInt32(value)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 48 def encodeUInt32(value) return [value].pack('C') if value < 128 bytes = [] until value == 0 bytes << (0x80 | (value & 0x7f)) value >>= 7 end bytes[-1] &= 0x7f bytes.pack('C*') end
serialize(upStream, recordList)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 89 def serialize(upStream, recordList) crc32cPack = ::Digest::CRC32c.new if recordList.is_a?Array recordList.each { |record| crc32cRecord = ::Digest::CRC32c.new schema = OdpsTableSchema.new schema = record.getTableSchema schema.mCols.each { | col | cellValue = record.getValue(col.mIdx) if cellValue == nil next end crc32cRecord.update(encodeFixed32(col.mIdx + 1)) case col.mType when $ODPS_BIGINT crc32cRecord.update(encodeFixed64(cellValue)) writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeSInt64(cellValue)) when $ODPS_DOUBLE crc32cRecord.update(encodeDouble(cellValue)) writeTag(col.mIdx + 1, ::Protobuf::WireType::FIXED64, upStream) upStream.write(encodeDouble(cellValue)) when $ODPS_BOOLEAN crc32cRecord.update(encodeBool(cellValue)) writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeBool(cellValue)) when $ODPS_DATETIME crc32cRecord.update(encodeFixed64(cellValue)) writeTag(col.mIdx + 1, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeDataTime(cellValue)) when $ODPS_STRING crc32cRecord.update(cellValue) writeTag(col.mIdx + 1, ::Protobuf::WireType::LENGTH_DELIMITED, upStream) upStream.write(encodeString(cellValue)) else raise OdpsDatahubException.new($INVALID_ARGUMENT, "invalid mType") end } recordCrc = crc32cRecord.checksum.to_i writeTag($TUNNEL_END_RECORD, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeUInt32(recordCrc)) crc32cPack.update(encodeFixed32(recordCrc)) } writeTag($TUNNEL_META_COUNT, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeSInt64(recordList.size)) writeTag($TUNNEL_META_CHECKSUM, ::Protobuf::WireType::VARINT, upStream) upStream.write(encodeUInt32(crc32cPack.checksum)) else raise OdpsDatahubException.new($INVALID_ARGUMENT, "param must be a array") end end
writeTag(idx, type, stream)
click to toggle source
# File lib/fluent/plugin/serialize/serializer.rb, line 84 def writeTag(idx, type, stream) key = (idx << 3) | type stream << ::Protobuf::Field::VarintField.encode(key) end