class LogStash::Inputs::AzureWADTable
Constants
- TICKS_SINCE_EPOCH
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurewadtable.rb, line 27 def initialize(*args) super(*args) end
Public Instance Methods
build_latent_query()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 55 def build_latent_query @logger.debug("from #{@last_timestamp} to #{@until_timestamp}") query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{partitionkey_from_datetime(@until_timestamp)}')" for i in 0..99 query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@until_timestamp)}')" end # for block query_filter = query_filter.gsub('"','') query_filter end
build_zero_latency_query()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 65 def build_zero_latency_query @logger.debug("from #{@last_timestamp} to most recent data") # query data using start_from_time query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}')" for i in 0..99 query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___9999999999999999999')" end # for block query_filter = query_filter.gsub('"','') query_filter end
partitionkey_from_datetime(time_string)
click to toggle source
Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:
-
Take time in UTC without seconds.
-
Convert it into .net ticks
-
add a '0' prefix.
# File lib/logstash/inputs/azurewadtable.rb, line 151 def partitionkey_from_datetime(time_string) collection_time = Time.parse(time_string) if collection_time @logger.debug("collection time parsed successfully #{collection_time}") else raise(ArgumentError, "Could not parse the time_string") end # if else block collection_time -= collection_time.sec ticks = to_ticks(collection_time) "0#{ticks}" end
process(output_queue)
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 76 def process(output_queue) if @data_latency_minutes > 0 @until_timestamp = (Time.now - (60 * @data_latency_minutes)).iso8601 unless @continuation_token query_filter = build_latent_query else query_filter = build_zero_latency_query end @logger.debug("Query filter: " + query_filter) query = { :top => @entity_count_to_process, :filter => query_filter, :continuation_token => @continuation_token } result = @azure_table_service.query_entities(@table_name, query) @continuation_token = result.continuation_token if result and result.length > 0 @logger.debug("#{result.length} results found.") last_good_timestamp = nil result.each do |entity| event = LogStash::Event.new(entity.properties) event.set("type", @table_name) # Help pretty print etw files if (@etw_pretty_print && !event.get("EventMessage").nil? && !event.get("Message").nil?) @logger.debug("event: " + event.to_s) eventMessage = event.get("EventMessage").to_s message = event.get("Message").to_s @logger.debug("EventMessage: " + eventMessage) @logger.debug("Message: " + message) if (eventMessage.include? "%") @logger.debug("starting pretty print") toReplace = eventMessage.scan(/%\d+/) payload = message.scan(/(?<!\\S)([a-zA-Z]+)=(\"[^\"]*\")(?!\\S)/) # Split up the format string to seperate all of the numbers toReplace.each do |key| @logger.debug("Replacing key: " + key.to_s) index = key.scan(/\d+/).join.to_i newValue = payload[index - 1][1] @logger.debug("New Value: " + newValue) eventMessage[key] = newValue end # do block event.set("EventMessage", eventMessage) @logger.debug("pretty print end. result: " + event.get("EventMessage").to_s) end end decorate(event) if event.get('PreciseTimeStamp').is_a?(Time) event.set('PreciseTimeStamp', LogStash::Timestamp.new(event.get('PreciseTimeStamp'))) end theTIMESTAMP = event.get('TIMESTAMP') if theTIMESTAMP.is_a?(LogStash::Timestamp) last_good_timestamp = theTIMESTAMP.to_iso8601 elsif theTIMESTAMP.is_a?(Time) last_good_timestamp = theTIMESTAMP.iso8601 event.set('TIMESTAMP', LogStash::Timestamp.new(theTIMESTAMP)) else @logger.warn("Found result with invalid TIMESTAMP. " + event.to_hash.to_s) end output_queue << event end # each block @idle_delay = 0 if (!last_good_timestamp.nil?) @last_timestamp = last_good_timestamp unless @continuation_token end else @logger.debug("No new results found.") @idle_delay = @idle_delay_seconds end # if block rescue => e @logger.error("Oh My, An error occurred.", :exception => e) raise end
register()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 32 def register client = Azure::Storage::Client.create(:storage_account_name => @storage_account_name, :storage_sas_token => @storage_sas_token) @azure_table_service = client.table_client @last_timestamp = @collection_start_time_utc @idle_delay = @idle_delay_seconds @continuation_token = nil end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 42 def run(output_queue) while !stop? @logger.debug("Starting process method @" + Time.now.to_s); process(output_queue) @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s); sleep @idle_delay end # while end
teardown()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 52 def teardown end
to_ticks(time_to_convert)
click to toggle source
Convert time to ticks
# File lib/logstash/inputs/azurewadtable.rb, line 165 def to_ticks(time_to_convert) @logger.debug("Converting time to ticks") time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH end