class LogStash::Inputs::AzureBlobStorage

This is a logstash input plugin for files in Azure Blob Storage. There is a storage explorer in the portal and an application with the same name storageexplorer.com. A storage account has by default a globally unique name, {storageaccount}.blob.core.windows.net which is a CNAME to Azures blob servers blob.*.store.core.windows.net. A storageaccount has an container and those have a directory and blobs (like files). Blobs have one or more blocks. After writing the blocks, they can be committed. Some Azure diagnostics can send events to an EventHub that can be parse through the plugin logstash-input-azure_event_hubs, but for the events that are only stored in an storage account, use this plugin. The original logstash-input-azureblob from azure-diagnostics-tools is great for low volumes, but it suffers from outdated client, slow reads, lease locking issues and json parse errors. azure.microsoft.com/en-us/services/storage/blobs/

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 313
def close
    save_registry(@registry)
end
register() click to toggle source

TODO: Other feature requests show file path in logger add filepath as part of log message option to keep registry on local disk

# File lib/logstash/inputs/azure_blob_storage.rb, line 106
def register
    @pipe_id = Thread.current[:name].split("[").last.split("]").first
    @logger.info("=== #{config_name} #{Gem.loaded_specs["logstash-input-"+config_name].version.to_s} / #{@pipe_id} / #{@id[0,6]} / ruby #{ RUBY_VERSION }p#{ RUBY_PATCHLEVEL } ===")
    @logger.info("If this plugin doesn't work, please raise an issue in https://github.com/janmg/logstash-input-azure_blob_storage")
    # TODO: consider multiple readers, so add pipeline @id or use logstash-to-logstash communication?
    # TODO: Implement retry ... Error: Connection refused - Failed to open TCP connection to
end
run(queue) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 116
def run(queue)
    # counter for all processed events since the start of this pipeline
    @processed = 0
    @regsaved = @processed

    connect

    @registry = Hash.new
    if registry_create_policy == "resume"
     for counter in 1..3
       begin
          if (!@registry_local_path.nil?)
              unless File.file?(@registry_local_path+"/"+@pipe_id)
                  @registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1])
                  #[0] headers [1] responsebody
                  @logger.info("migrating from remote registry #{registry_path}")
              else
                  if !Dir.exist?(@registry_local_path)
                      FileUtils.mkdir_p(@registry_local_path)
                  end
                  @registry = Marshal.load(File.read(@registry_local_path+"/"+@pipe_id))
                  @logger.info("resuming from local registry #{registry_local_path+"/"+@pipe_id}")
              end
          else
              @registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1])
              #[0] headers [1] responsebody
              @logger.info("resuming from remote registry #{registry_path}")
          end
          break
        rescue Exception => e
          @logger.error("caught: #{e.message}")
          @registry.clear
          @logger.error("loading registry failed for attempt #{counter} of 3")
        end
      end
    end
    # read filelist and set offsets to file length to mark all the old files as done
    if registry_create_policy == "start_fresh"
        @registry = list_blobs(true)
        save_registry(@registry)
        @logger.info("starting fresh, writing a clean registry to contain #{@registry.size} blobs/files")
    end

    @is_json = false
    begin
      if @codec.class.name.eql?("LogStash::Codecs::JSON")
        @is_json = true 
      end
    end
    @head = ''
    @tail = ''
    # if codec=json sniff one files blocks A and Z to learn file_head and file_tail
    if @is_json
        if file_head
            @head = file_head
        end
        if file_tail
            @tail = file_tail
        end
        if file_head and file_tail and !skip_learning
            learn_encapsulation
        end
        @logger.info("head will be: #{@head} and tail is set to #{@tail}")
    end

    newreg   = Hash.new
    filelist = Hash.new
    worklist = Hash.new
    @last = start = Time.now.to_i

    # This is the main loop, it
    # 1. Lists all the files in the remote storage account that match the path prefix
    # 2. Filters on path_filters to only include files that match the directory and file glob (**/*.json)
    # 3. Save the listed files in a registry of known files and filesizes.
    # 4. List all the files again and compare the registry with the new filelist and put the delta in a worklist
    # 5. Process the worklist and put all events in the logstash queue.
    # 6. if there is time left, sleep to complete the interval. If processing takes more than an inteval, save the registry and continue.
    # 7. If stop signal comes, finish the current file, save the registry and quit
    while !stop?
        # load the registry, compare it's offsets to file list, set offset to 0 for new files, process the whole list and if finished within the interval wait for next loop,
        # TODO: sort by timestamp ?
        #filelist.sort_by(|k,v|resource(k)[:date])
        worklist.clear
        filelist.clear
        newreg.clear

        # Listing all the files
        filelist = list_blobs(false)
        filelist.each do |name, file|
            off = 0
            begin
                off = @registry[name][:offset]
            rescue
                off = 0
            end
            newreg.store(name, { :offset => off, :length => file[:length] })
            if (@debug_until > @processed) then @logger.info("2: adding offsets: #{name} #{off} #{file[:length]}") end
        end
        # size nilClass when the list doesn't grow?!
        # Worklist is the subset of files where the already read offset is smaller than the file size
        worklist.clear
        chunk = nil

        worklist = newreg.select {|name,file| file[:offset] < file[:length]}
        if (worklist.size > 4) then @logger.info("worklist contains #{worklist.size} blobs") end

        # Start of processing
        # This would be ideal for threading since it's IO intensive, would be nice with a ruby native ThreadPool
        if (worklist.size > 0) then
          worklist.each do |name, file|
            start = Time.now.to_i
            if (@debug_until > @processed) then @logger.info("3: processing #{name} from #{file[:offset]} to #{file[:length]}") end
            size = 0
            if file[:offset] == 0
                # This is where Sera4000 issue starts
                # For an append blob, reading full and crashing, retry, last_modified? ... lenght? ... committed? ...
                # length and skip reg value
                if (file[:length] > 0)
                    begin
                        chunk = full_read(name)
                        size=chunk.size
                    rescue Exception => e
                        @logger.error("Failed to read #{name} because of: #{e.message} .. will continue, set file as read and pretend this never happened")
                        @logger.error("#{size} size and #{file[:length]} file length")
                        size = file[:length]
                    end 
                else
                    @logger.info("found a zero size file #{name}")
                    chunk = nil 
                end
            else
                chunk = partial_read_json(name, file[:offset], file[:length])
                @logger.debug("partial file #{name} from #{file[:offset]} to #{file[:length]}")
            end
            if logtype == "nsgflowlog" && @is_json
              # skip empty chunks
              unless chunk.nil?
                res = resource(name)
                begin
                    fingjson = JSON.parse(chunk)
                    @processed += nsgflowlog(queue, fingjson, name)
                    @logger.debug("Processed #{res[:nsg]} [#{res[:date]}] #{@processed} events")
                rescue JSON::ParserError
                    @logger.error("parse error on #{res[:nsg]} [#{res[:date]}] offset: #{file[:offset]} length: #{file[:length]}")
                end
              end
            # TODO: Convert this to line based grokking.
            # TODO: ECS Compliance?
            elsif logtype == "wadiis" && !@is_json
                @processed += wadiislog(queue, name)
            else
                counter = 0
                begin
                    @codec.decode(chunk) do |event|
                    counter += 1
                    if @addfilename
                      event.set('filename', name)
                    end
                    decorate(event)
                    queue << event
                  end
                rescue Exception => e
                    @logger.error("codec exception: #{e.message} .. will continue and pretend this never happened")
                    @registry.store(name, { :offset => file[:length], :length => file[:length] })
                    @logger.debug("#{chunk}")
                end
                @processed += counter
            end
            @registry.store(name, { :offset => size, :length => file[:length] })
            # TODO add input plugin option to prevent connection cache
            @blob_client.client.reset_agents!
            #@logger.info("name #{name} size #{size} len #{file[:length]}")
            # if stop? good moment to stop what we're doing
            if stop?
                return
            end
            if ((Time.now.to_i - @last) > @interval)
                save_registry(@registry)
            end
          end
        end
        # The files that got processed after the last registry save need to be saved too, in case the worklist is empty for some intervals.
        now = Time.now.to_i
        if ((now - @last) > @interval)
            save_registry(@registry)
        end
        sleeptime = interval - ((now - start) % interval)
        if @debug_timer
            @logger.info("going to sleep for #{sleeptime} seconds")
        end
        Stud.stoppable_sleep(sleeptime) { stop? }
    end
end
stop() click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 310
def stop
    save_registry(@registry)
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 319
def connect
    # Try in this order to access the storageaccount
    # 1. storageaccount / sas_token
    # 2. connection_string
    # 3. storageaccount / access_key

    unless connection_string.nil?
        conn = connection_string.value
    end
    unless sas_token.nil?
        unless sas_token.value.start_with?('?')
            conn = "BlobEndpoint=https://#{storageaccount}.#{dns_suffix};SharedAccessSignature=#{sas_token.value}"
        else
            conn = sas_token.value
        end
    end
    unless conn.nil?
        @blob_client = Azure::Storage::Blob::BlobService.create_from_connection_string(conn)
    else
#        unless use_development_storage?
        @blob_client = Azure::Storage::Blob::BlobService.create(
            storage_account_name: storageaccount,
            storage_dns_suffix: dns_suffix,
            storage_access_key: access_key.value,
        )
#        else
#            @logger.info("not yet implemented")
#        end
    end
end
full_read(filename) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 350
def full_read(filename)
    tries ||= 2
    begin
        return @blob_client.get_blob(container, filename)[1]
    rescue Exception => e
        @logger.error("caught: #{e.message} for full_read")
        if (tries -= 1) > 0
           if e.message = "Connection reset by peer"
               connect
           end
           retry
        end
    end
    begin
        chuck = @blob_client.get_blob(container, filename)[1]
    end
    return chuck
end
learn_encapsulation() click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 516
def learn_encapsulation
  @logger.info("learn_encapsulation, this can be skipped by setting skip_learning => true. Or set both head_file and tail_file")
    # From one file, read first block and last block to learn head and tail
    begin
        blobs = @blob_client.list_blobs(container, { max_results: 3, prefix: @prefix})
        blobs.each do |blob|
            unless blob.name == registry_path
              begin
                blocks = @blob_client.list_blob_blocks(container, blob.name)[:committed]
                if blocks.first.name.start_with?('A00')
                  @logger.debug("using #{blob.name}/#{blocks.first.name} to learn the json header")
                  @head = @blob_client.get_blob(container, blob.name, start_range: 0, end_range: blocks.first.size-1)[1]
                end
                if blocks.last.name.start_with?('Z00')
                  @logger.debug("using #{blob.name}/#{blocks.last.name} to learn the json footer")
                  length = blob.properties[:content_length].to_i
                  offset = length - blocks.last.size
                  @tail = @blob_client.get_blob(container, blob.name, start_range: offset, end_range: length-1)[1]
                  @logger.debug("learned tail: #{@tail}")
                end
              rescue Exception => e
                @logger.info("learn json one of the attempts failed #{e.message}")
              end
            end
        end
    rescue Exception => e
        @logger.info("learn json header and footer failed because #{e.message}")
    end
end
list_blobs(fill) click to toggle source

list all blobs in the blobstore, set the offsets from the registry and return the filelist inspired by: github.com/Azure-Samples/storage-blobs-ruby-quickstart/blob/master/example.rb

# File lib/logstash/inputs/azure_blob_storage.rb, line 441
def list_blobs(fill)
    tries ||= 3
    begin
        return try_list_blobs(fill)
    rescue Exception => e
        @logger.error("caught: #{e.message} for list_blobs retries left #{tries}")
        if (tries -= 1) > 0
           retry
        end
    end
end
nsgflowlog(queue, json, name) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 389
def nsgflowlog(queue, json, name)
    count=0
    json["records"].each do |record|
      res = resource(record["resourceId"])
      resource = { :subscription => res[:subscription], :resourcegroup => res[:resourcegroup], :nsg => res[:nsg] }
      @logger.trace(resource.to_s)
      record["properties"]["flows"].each do |flows|
          rule = resource.merge ({ :rule => flows["rule"]})
          flows["flows"].each do |flowx|
              flowx["flowTuples"].each do |tup|
                  tups = tup.split(',')
                  ev = rule.merge({:unixtimestamp => tups[0], :src_ip => tups[1], :dst_ip => tups[2], :src_port => tups[3], :dst_port => tups[4], :protocol => tups[5], :direction => tups[6], :decision => tups[7]})
                  if (record["properties"]["Version"]==2)
                    tups[9] = 0 if tups[9].nil?
                    tups[10] = 0 if tups[10].nil?
                    tups[11] = 0 if tups[11].nil?
                    tups[12] = 0 if tups[12].nil?
                      ev.merge!( {:flowstate => tups[8], :src_pack => tups[9], :src_bytes => tups[10], :dst_pack => tups[11], :dst_bytes => tups[12]} )
                  end
                  @logger.trace(ev.to_s)
                  if @addfilename
                      ev.merge!( {:filename => name } )
                  end
                  event = LogStash::Event.new('message' => ev.to_json)
                  decorate(event)
                  queue << event
                  count+=1
              end
          end
      end
    end
    return count 
end
partial_read_json(filename, offset, length) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 369
def partial_read_json(filename, offset, length)
    content = @blob_client.get_blob(container, filename, start_range: offset-@tail.length, end_range: length-1)[1]
    if content.end_with?(@tail)
        # the tail is part of the last block, so included in the total length of the get_blob
        return @head + strip_comma(content)
    else
        # when the file has grown between list_blobs and the time of partial reading, the tail will be wrong
        return @head + strip_comma(content[0...-@tail.length]) + @tail
    end
end
resource(str) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 546
def resource(str)
      temp = str.split('/')
      date = '---'
      unless temp[9].nil?
        date = val(temp[9])+'/'+val(temp[10])+'/'+val(temp[11])+'-'+val(temp[12])+':00'
      end
      return {:subscription=> temp[2], :resourcegroup=>temp[4], :nsg=>temp[8], :date=>date}
end
save_registry(filelist) click to toggle source

When events were processed after the last registry save, start a thread to update the registry file.

# File lib/logstash/inputs/azure_blob_storage.rb, line 488
def save_registry(filelist)
    # Because of threading, processed values and regsaved are not thread safe, they can change as instance variable @! Most of the time this is fine because the registry is the last resort, but be careful about corner cases!
    unless @processed == @regsaved
        @regsaved = @processed
        unless (@busy_writing_registry)
        Thread.new {
            begin
                @busy_writing_registry = true
                unless (@registry_local_path)
                    @blob_client.create_block_blob(container, registry_path, Marshal.dump(filelist))
                    @logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to remote registry #{registry_path}")
                else
                    File.open(@registry_local_path+"/"+@pipe_id, 'w') { |file| file.write(Marshal.dump(filelist)) }
                    @logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to local registry #{registry_local_path+"/"+@pipe_id}")
                end
                @busy_writing_registry = false
                @last = Time.now.to_i
            rescue
                @logger.error("Oh my, registry write failed, do you have write access?")
            end
        }
        else
            @logger.info("Skipped writing the registry because previous write still in progress, it just takes long or may be hanging!")
        end
    end
end
strip_comma(str) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 380
def strip_comma(str)
    # when skipping over the first blocks the json will start with a comma that needs to be stripped. there should not be a trailing comma, but it gets stripped too
    if str.start_with?(',')
       str[0] = ''
    end
    str.nil? ? nil : str.chomp(",")
end
try_list_blobs(fill) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 453
def try_list_blobs(fill)
# inspired by: http://blog.mirthlab.com/2012/05/25/cleanly-retrying-blocks-of-code-after-an-exception-in-ruby/
   chrono = Time.now.to_i
   files = Hash.new
   nextMarker = nil
   counter = 1
   loop do
         blobs = @blob_client.list_blobs(container, { marker: nextMarker, prefix: @prefix})
         blobs.each do |blob|
# FNM_PATHNAME is required so that "**/test" can match "test" at the root folder
# FNM_EXTGLOB allows you to use "test{a,b,c}" to match either "testa", "testb" or "testc" (closer to shell behavior)
           unless blob.name == registry_path
             if @path_filters.any? {|path| File.fnmatch?(path, blob.name, File::FNM_PATHNAME | File::FNM_EXTGLOB)}
                 length = blob.properties[:content_length].to_i
                 offset = 0
                 if fill
                     offset = length
                 end
                 files.store(blob.name, { :offset => offset, :length => length })
                 if (@debug_until > @processed) then @logger.info("1: list_blobs #{blob.name} #{offset} #{length}") end
             end
           end
         end
         nextMarker = blobs.continuation_token
         break unless nextMarker && !nextMarker.empty?
         if (counter % 10 == 0) then @logger.info(" listing #{counter * 50000} files") end
         counter+=1
        end
        if @debug_timer
            @logger.info("list_blobs took #{Time.now.to_i - chrono} sec")
        end
    return files
end
val(str) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 555
def val(str)
    return str.split('=')[1]
end
wadiislog(lines) click to toggle source
# File lib/logstash/inputs/azure_blob_storage.rb, line 423
def wadiislog(lines)
      count=0
      lines.each do |line|
          unless line.start_with?('#')
              queue << LogStash::Event.new('message' => ev.to_json)
              count+=1
          end
      end
      return count
  # date {
  #   match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss" ]
  #   target => "@timestamp"
  #   remove_field => ["log_timestamp"]
  # }
end