class LogStash::Inputs::AzureWADTable

Constants

TICKS_SINCE_EPOCH

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurewadtable.rb, line 27
def initialize(*args)
  super(*args)
end

Public Instance Methods

build_latent_query() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 55
def build_latent_query
  @logger.debug("from #{@last_timestamp} to #{@until_timestamp}")
  query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{partitionkey_from_datetime(@until_timestamp)}')"
  for i in 0..99
    query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@until_timestamp)}')"
  end # for block
  query_filter = query_filter.gsub('"','')
  query_filter
end
build_zero_latency_query() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 65
def build_zero_latency_query
  @logger.debug("from #{@last_timestamp} to most recent data")
  # query data using start_from_time
  query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}')"
  for i in 0..99
    query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___9999999999999999999')"
  end # for block
  query_filter = query_filter.gsub('"','')
  query_filter
end
partitionkey_from_datetime(time_string) click to toggle source

Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:

  1. Take time in UTC without seconds.

  2. Convert it into .net ticks

  3. add a '0' prefix.

# File lib/logstash/inputs/azurewadtable.rb, line 151
def partitionkey_from_datetime(time_string)
  collection_time = Time.parse(time_string)
  if collection_time
    @logger.debug("collection time parsed successfully #{collection_time}")
  else
    raise(ArgumentError, "Could not parse the time_string")
  end # if else block

  collection_time -= collection_time.sec
  ticks = to_ticks(collection_time)
  "0#{ticks}"
end
process(output_queue) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 76
def process(output_queue)
  if @data_latency_minutes > 0
    @until_timestamp = (Time.now - (60 * @data_latency_minutes)).iso8601 unless @continuation_token
    query_filter = build_latent_query
  else
    query_filter = build_zero_latency_query
  end
  @logger.debug("Query filter: " + query_filter)
  query = { :top => @entity_count_to_process, :filter => query_filter, :continuation_token => @continuation_token }
  result = @azure_table_service.query_entities(@table_name, query)
  @continuation_token = result.continuation_token

  if result and result.length > 0
    @logger.debug("#{result.length} results found.")
    last_good_timestamp = nil
    result.each do |entity|
      event = LogStash::Event.new(entity.properties)
      event.set("type", @table_name)

      # Help pretty print etw files
      if (@etw_pretty_print && !event.get("EventMessage").nil? && !event.get("Message").nil?)
        @logger.debug("event: " + event.to_s)
        eventMessage = event.get("EventMessage").to_s
        message = event.get("Message").to_s
        @logger.debug("EventMessage: " + eventMessage)
        @logger.debug("Message: " + message)
        if (eventMessage.include? "%")
          @logger.debug("starting pretty print")
          toReplace = eventMessage.scan(/%\d+/)
          payload = message.scan(/(?<!\\S)([a-zA-Z]+)=(\"[^\"]*\")(?!\\S)/)
          # Split up the format string to seperate all of the numbers
          toReplace.each do |key|
            @logger.debug("Replacing key: " + key.to_s)
            index = key.scan(/\d+/).join.to_i
            newValue = payload[index - 1][1]
            @logger.debug("New Value: " + newValue)
            eventMessage[key] = newValue
          end # do block
          event.set("EventMessage", eventMessage)
          @logger.debug("pretty print end. result: " + event.get("EventMessage").to_s)
        end
      end
      decorate(event)
      if event.get('PreciseTimeStamp').is_a?(Time)
        event.set('PreciseTimeStamp', LogStash::Timestamp.new(event.get('PreciseTimeStamp')))
      end
      theTIMESTAMP = event.get('TIMESTAMP')
      if theTIMESTAMP.is_a?(LogStash::Timestamp)
        last_good_timestamp = theTIMESTAMP.to_iso8601
      elsif theTIMESTAMP.is_a?(Time)
        last_good_timestamp = theTIMESTAMP.iso8601
        event.set('TIMESTAMP', LogStash::Timestamp.new(theTIMESTAMP))
      else
        @logger.warn("Found result with invalid TIMESTAMP. " + event.to_hash.to_s)
      end
      output_queue << event
    end # each block
    @idle_delay = 0
    if (!last_good_timestamp.nil?)
      @last_timestamp = last_good_timestamp unless @continuation_token
    end
  else
    @logger.debug("No new results found.")
    @idle_delay = @idle_delay_seconds
  end # if block

rescue => e
  @logger.error("Oh My, An error occurred.", :exception => e)
  raise
end
register() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 32
def register     
  client = Azure::Storage::Client.create(:storage_account_name => @storage_account_name, :storage_sas_token => @storage_sas_token)
  @azure_table_service = client.table_client

  @last_timestamp = @collection_start_time_utc
  @idle_delay = @idle_delay_seconds
  @continuation_token = nil
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 42
def run(output_queue)
  while !stop?
    @logger.debug("Starting process method @" + Time.now.to_s);
    process(output_queue)
    @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s);
    sleep @idle_delay
  end # while
end
teardown() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 52
def teardown
end
to_ticks(time_to_convert) click to toggle source

Convert time to ticks

# File lib/logstash/inputs/azurewadtable.rb, line 165
def to_ticks(time_to_convert)
  @logger.debug("Converting time to ticks")
  time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH
end