class OdpsDatahub::OdpsTable
Public Class Methods
new(odpsConfig, projectName, tableName)
click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 105 def initialize(odpsConfig, projectName, tableName) @mOdpsConfig = odpsConfig @mProjectName = projectName @mTableName = tableName end
Public Instance Methods
addPartition(ptStr)
click to toggle source
ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits
# File lib/fluent/plugin/odps/odps_table.rb, line 158 def addPartition(ptStr) pts_array = ptStr.split(',') sqlstr = "ALTER TABLE " + @mProjectName + "." + @mTableName sqlstr = sqlstr + " ADD IF NOT EXISTS" + " PARTITION (" pts_array.each { |pt| ptkv = pt.split('=') if ptkv.size != 2 raise "invalid partition spec" + pt end sqlstr += ptkv[0] + '=' + "'" + ptkv[1] + "'" + ',' } sqlstr = sqlstr[0..-2] + ");" taskName = "SQLAddPartitionTask" runSQL(taskName, sqlstr) end
genJobXml(name, priority, comment, taskStr, runMode='sequence')
click to toggle source
TODO support mulit task
# File lib/fluent/plugin/odps/odps_table.rb, line 201 def genJobXml(name, priority, comment, taskStr, runMode='sequence') job_xml = XmlTemplate.getJobXml(name, priority, comment, taskStr, runMode) return job_xml end
getPartitionList()
click to toggle source
get partitions and return an array like :[{“time”=>“2016”, “place”=>“china2”},{“time”=>“2015”, “place”=>“china”}]
# File lib/fluent/plugin/odps/odps_table.rb, line 112 def getPartitionList partitionList = Array.new url = "/projects/" + @mProjectName +"/tables/" + @mTableName lastMarker = nil isEnd = false while !isEnd do header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProjectName param[$PARAM_EXPECT_MARKER] = true param[$PARAM_PARTITIONS] = "" if lastMarker != nil param[$PARAM_MARKER] = lastMarker end conn = HttpConnection.new(@mOdpsConfig, header, param, url, "GET", "", true) res = conn.getResponse if res.code != "200" return partitionList #raise OdpsDatahubException.new($INVALID_ARGUMENT, "This not a partitioned table") end doc = REXML::Document.new(res.body.to_s) #parse partitions partitionsXml = doc.root.get_elements("Partition") partitionsXml.each { |partition| partitionInfo = Hash.new partition.elements.each { |column| partitionInfo[column.attributes["Name"]] = column.attributes["Value"] } partitionList.push(partitionInfo) } #get marker markerXml = doc.root.get_elements("Marker") if markerXml[0].text == nil isEnd = true elsif lastMarker = markerXml[0].text end end return partitionList end
getTaskResult(instanceurl, name)
click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 236 def getTaskResult(instanceurl, name) headers = Hash.new params = Hash.new params['result'] = "" res = nil conn = HttpConnection.new(@mOdpsConfig, headers, params, instanceurl, 'GET', "", true) res = conn.getResponse doc = REXML::Document.new(res.body.to_s) doc.root.elements.each('Tasks/Task') { |e| taskname = e.elements['Name'].text if taskname == name.to_s return e.elements['Result'].cdatas().to_s end } end
runSQL(taskName, sqlstring)
click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 174 def runSQL(taskName, sqlstring) task_xml = XmlTemplate.getTaskXml(taskName, sqlstring) job_xml = genJobXml('arbitriary_job', '9', "", task_xml) headers = Hash.new headers['Content-Type'] = 'application/xml' headers['Content-MD5'] = Digest::MD5.hexdigest(job_xml) headers['Content-Length'] = job_xml.size.to_s params = Hash.new url = "/projects/" + @mProjectName +"/instances" conn = HttpConnection.new(@mOdpsConfig, headers, params, url, 'POST', job_xml, true) res = conn.getResponse if res.code != '200' raise "Add partition failed with error" + res.code.to_s end if res.to_hash['Content-Length'] != "0" and not res.body.to_s.include?"Instance" raise res.body end waitForSQLComplete(res) end
waitForSQLComplete(res)
click to toggle source
# File lib/fluent/plugin/odps/odps_table.rb, line 206 def waitForSQLComplete(res) ret_headers = res.to_hash instanceurl = "/projects/" + @mProjectName +"/instances" + "/" + ret_headers['location'][0].split('/')[-1] headers = Hash.new params = Hash.new params['taskstatus'] = "" res = nil while true conn = HttpConnection.new(@mOdpsConfig, headers, params, instanceurl, 'GET', "", true) res = conn.getResponse doc = REXML::Document.new(res.body.to_s) insStatus = doc.root.elements["Status"].text if insStatus == 'Terminated' break; elsif insStatus == 'Running' or insStatus == 'Suspended' sleep(5) end end doc.root.elements.each('Tasks/Task') { |e| status = e.elements['Status'].text name = e.elements['Name'].text if status.to_s != 'Success' raise getTaskResult(instanceurl, name.to_s) end } end