class OdpsDatahub::StreamClient
Attributes
mOdpsConfig[R]
mOdpsTable[R]
mOdpsTableSchema[R]
mProject[R]
mTable[R]
Public Class Methods
new(odpsConfig, project, table)
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 30 def initialize(odpsConfig, project, table) @mOdpsConfig = odpsConfig @mProject = project @mTable = table @mShards = Array.new if @mProject == nil or @mProject == "" @mProject = @mOdpsConfig.defaultProjectName end @mOdpsTable = OdpsTable.new(@mOdpsConfig, @mProject, @mTable) header = Hash.new param = Hash.new param[$PARAM_QUERY] = "meta" conn = HttpConnection.new(@mOdpsConfig, header, param, getResource, "GET") res = conn.getResponse jsonTableMeta = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(jsonTableMeta["Code"], "initialize failed because " + jsonTableMeta["Message"]) end @mOdpsTableSchema = OdpsTableSchema.new(jsonTableMeta["Schema"]) end
Public Instance Methods
addPartition(ptStr)
click to toggle source
ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits
# File lib/fluent/plugin/stream_client.rb, line 58 def addPartition(ptStr) @mOdpsTable.addPartition(ptStr) end
createStreamArrayWriter(shardId = nil)
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 70 def createStreamArrayWriter(shardId = nil) StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId, @mOdpsTableSchema) end
createStreamWriter(shardId = nil)
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 66 def createStreamWriter(shardId = nil) StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId) end
getOdpsTableSchema()
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 62 def getOdpsTableSchema return @mOdpsTableSchema end
getPartitionList()
click to toggle source
get partitions and return an array like :[{“time”=>“2016”, “place”=>“china2”},{“time”=>“2015”, “place”=>“china”}]
# File lib/fluent/plugin/stream_client.rb, line 52 def getPartitionList @mOdpsTable.getPartitionList end
getShardStatus()
click to toggle source
return json like [{“ShardId”: “0”,“State”: “loaded”},{“ShardId”: “1”,“State”: “loaded”}]
# File lib/fluent/plugin/stream_client.rb, line 75 def getShardStatus header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject param[$PARAM_SHARD_STATUS] = "" conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "GET") res = conn.getResponse json_obj = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(json_obj["Code"], "getShardStatus failed because " + json_obj["Message"]) end return json_obj["ShardStatus"] end
loadShard(idx)
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 90 def loadShard(idx) if idx < 0 raise OdpsDatahubException.new($INVALID_ARGUMENT, "loadShard num invalid") end header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject param[$PARAM_SHARD_NUMBER] = idx conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "POST") res = conn.getResponse if res.code != "200" json_obj = JSON.parse(res.body) raise OdpsDatahubException.new(json_obj["Code"], "loadShard failed because " + json_obj["Message"]) end end
Protected Instance Methods
getResource()
click to toggle source
# File lib/fluent/plugin/stream_client.rb, line 107 def getResource return "/projects/" + @mProject + "/tables/" + @mTable end