class LogStash::Inputs::AzureBlobStorage
This is a logstash input plugin for files in Azure Blob Storage. There is a storage explorer in the portal and an application with the same name storageexplorer.com. A storage account has by default a globally unique name, {storageaccount}.blob.core.windows.net which is a CNAME to Azures blob servers blob.*.store.core.windows.net. A storageaccount has an container and those have a directory and blobs (like files). Blobs have one or more blocks. After writing the blocks, they can be committed. Some Azure diagnostics can send events to an EventHub that can be parse through the plugin logstash-input-azure_event_hubs, but for the events that are only stored in an storage account, use this plugin. The original logstash-input-azureblob from azure-diagnostics-tools is great for low volumes, but it suffers from outdated client, slow reads, lease locking issues and json parse errors. azure.microsoft.com/en-us/services/storage/blobs/
Public Instance Methods
# File lib/logstash/inputs/azure_blob_storage.rb, line 313 def close save_registry(@registry) end
TODO: Other feature requests show file path in logger add filepath as part of log message option to keep registry on local disk
# File lib/logstash/inputs/azure_blob_storage.rb, line 106 def register @pipe_id = Thread.current[:name].split("[").last.split("]").first @logger.info("=== #{config_name} #{Gem.loaded_specs["logstash-input-"+config_name].version.to_s} / #{@pipe_id} / #{@id[0,6]} / ruby #{ RUBY_VERSION }p#{ RUBY_PATCHLEVEL } ===") @logger.info("If this plugin doesn't work, please raise an issue in https://github.com/janmg/logstash-input-azure_blob_storage") # TODO: consider multiple readers, so add pipeline @id or use logstash-to-logstash communication? # TODO: Implement retry ... Error: Connection refused - Failed to open TCP connection to end
# File lib/logstash/inputs/azure_blob_storage.rb, line 116 def run(queue) # counter for all processed events since the start of this pipeline @processed = 0 @regsaved = @processed connect @registry = Hash.new if registry_create_policy == "resume" for counter in 1..3 begin if (!@registry_local_path.nil?) unless File.file?(@registry_local_path+"/"+@pipe_id) @registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1]) #[0] headers [1] responsebody @logger.info("migrating from remote registry #{registry_path}") else if !Dir.exist?(@registry_local_path) FileUtils.mkdir_p(@registry_local_path) end @registry = Marshal.load(File.read(@registry_local_path+"/"+@pipe_id)) @logger.info("resuming from local registry #{registry_local_path+"/"+@pipe_id}") end else @registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1]) #[0] headers [1] responsebody @logger.info("resuming from remote registry #{registry_path}") end break rescue Exception => e @logger.error("caught: #{e.message}") @registry.clear @logger.error("loading registry failed for attempt #{counter} of 3") end end end # read filelist and set offsets to file length to mark all the old files as done if registry_create_policy == "start_fresh" @registry = list_blobs(true) save_registry(@registry) @logger.info("starting fresh, writing a clean registry to contain #{@registry.size} blobs/files") end @is_json = false begin if @codec.class.name.eql?("LogStash::Codecs::JSON") @is_json = true end end @head = '' @tail = '' # if codec=json sniff one files blocks A and Z to learn file_head and file_tail if @is_json if file_head @head = file_head end if file_tail @tail = file_tail end if file_head and file_tail and !skip_learning learn_encapsulation end @logger.info("head will be: #{@head} and tail is set to #{@tail}") end newreg = Hash.new filelist = Hash.new worklist = Hash.new @last = start = Time.now.to_i # This is the main loop, it # 1. Lists all the files in the remote storage account that match the path prefix # 2. Filters on path_filters to only include files that match the directory and file glob (**/*.json) # 3. Save the listed files in a registry of known files and filesizes. # 4. List all the files again and compare the registry with the new filelist and put the delta in a worklist # 5. Process the worklist and put all events in the logstash queue. # 6. if there is time left, sleep to complete the interval. If processing takes more than an inteval, save the registry and continue. # 7. If stop signal comes, finish the current file, save the registry and quit while !stop? # load the registry, compare it's offsets to file list, set offset to 0 for new files, process the whole list and if finished within the interval wait for next loop, # TODO: sort by timestamp ? #filelist.sort_by(|k,v|resource(k)[:date]) worklist.clear filelist.clear newreg.clear # Listing all the files filelist = list_blobs(false) filelist.each do |name, file| off = 0 begin off = @registry[name][:offset] rescue off = 0 end newreg.store(name, { :offset => off, :length => file[:length] }) if (@debug_until > @processed) then @logger.info("2: adding offsets: #{name} #{off} #{file[:length]}") end end # size nilClass when the list doesn't grow?! # Worklist is the subset of files where the already read offset is smaller than the file size worklist.clear chunk = nil worklist = newreg.select {|name,file| file[:offset] < file[:length]} if (worklist.size > 4) then @logger.info("worklist contains #{worklist.size} blobs") end # Start of processing # This would be ideal for threading since it's IO intensive, would be nice with a ruby native ThreadPool if (worklist.size > 0) then worklist.each do |name, file| start = Time.now.to_i if (@debug_until > @processed) then @logger.info("3: processing #{name} from #{file[:offset]} to #{file[:length]}") end size = 0 if file[:offset] == 0 # This is where Sera4000 issue starts # For an append blob, reading full and crashing, retry, last_modified? ... lenght? ... committed? ... # length and skip reg value if (file[:length] > 0) begin chunk = full_read(name) size=chunk.size rescue Exception => e @logger.error("Failed to read #{name} because of: #{e.message} .. will continue, set file as read and pretend this never happened") @logger.error("#{size} size and #{file[:length]} file length") size = file[:length] end else @logger.info("found a zero size file #{name}") chunk = nil end else chunk = partial_read_json(name, file[:offset], file[:length]) @logger.debug("partial file #{name} from #{file[:offset]} to #{file[:length]}") end if logtype == "nsgflowlog" && @is_json # skip empty chunks unless chunk.nil? res = resource(name) begin fingjson = JSON.parse(chunk) @processed += nsgflowlog(queue, fingjson, name) @logger.debug("Processed #{res[:nsg]} [#{res[:date]}] #{@processed} events") rescue JSON::ParserError @logger.error("parse error on #{res[:nsg]} [#{res[:date]}] offset: #{file[:offset]} length: #{file[:length]}") end end # TODO: Convert this to line based grokking. # TODO: ECS Compliance? elsif logtype == "wadiis" && !@is_json @processed += wadiislog(queue, name) else counter = 0 begin @codec.decode(chunk) do |event| counter += 1 if @addfilename event.set('filename', name) end decorate(event) queue << event end rescue Exception => e @logger.error("codec exception: #{e.message} .. will continue and pretend this never happened") @registry.store(name, { :offset => file[:length], :length => file[:length] }) @logger.debug("#{chunk}") end @processed += counter end @registry.store(name, { :offset => size, :length => file[:length] }) # TODO add input plugin option to prevent connection cache @blob_client.client.reset_agents! #@logger.info("name #{name} size #{size} len #{file[:length]}") # if stop? good moment to stop what we're doing if stop? return end if ((Time.now.to_i - @last) > @interval) save_registry(@registry) end end end # The files that got processed after the last registry save need to be saved too, in case the worklist is empty for some intervals. now = Time.now.to_i if ((now - @last) > @interval) save_registry(@registry) end sleeptime = interval - ((now - start) % interval) if @debug_timer @logger.info("going to sleep for #{sleeptime} seconds") end Stud.stoppable_sleep(sleeptime) { stop? } end end
# File lib/logstash/inputs/azure_blob_storage.rb, line 310 def stop save_registry(@registry) end
Private Instance Methods
# File lib/logstash/inputs/azure_blob_storage.rb, line 319 def connect # Try in this order to access the storageaccount # 1. storageaccount / sas_token # 2. connection_string # 3. storageaccount / access_key unless connection_string.nil? conn = connection_string.value end unless sas_token.nil? unless sas_token.value.start_with?('?') conn = "BlobEndpoint=https://#{storageaccount}.#{dns_suffix};SharedAccessSignature=#{sas_token.value}" else conn = sas_token.value end end unless conn.nil? @blob_client = Azure::Storage::Blob::BlobService.create_from_connection_string(conn) else # unless use_development_storage? @blob_client = Azure::Storage::Blob::BlobService.create( storage_account_name: storageaccount, storage_dns_suffix: dns_suffix, storage_access_key: access_key.value, ) # else # @logger.info("not yet implemented") # end end end
# File lib/logstash/inputs/azure_blob_storage.rb, line 350 def full_read(filename) tries ||= 2 begin return @blob_client.get_blob(container, filename)[1] rescue Exception => e @logger.error("caught: #{e.message} for full_read") if (tries -= 1) > 0 if e.message = "Connection reset by peer" connect end retry end end begin chuck = @blob_client.get_blob(container, filename)[1] end return chuck end
# File lib/logstash/inputs/azure_blob_storage.rb, line 516 def learn_encapsulation @logger.info("learn_encapsulation, this can be skipped by setting skip_learning => true. Or set both head_file and tail_file") # From one file, read first block and last block to learn head and tail begin blobs = @blob_client.list_blobs(container, { max_results: 3, prefix: @prefix}) blobs.each do |blob| unless blob.name == registry_path begin blocks = @blob_client.list_blob_blocks(container, blob.name)[:committed] if blocks.first.name.start_with?('A00') @logger.debug("using #{blob.name}/#{blocks.first.name} to learn the json header") @head = @blob_client.get_blob(container, blob.name, start_range: 0, end_range: blocks.first.size-1)[1] end if blocks.last.name.start_with?('Z00') @logger.debug("using #{blob.name}/#{blocks.last.name} to learn the json footer") length = blob.properties[:content_length].to_i offset = length - blocks.last.size @tail = @blob_client.get_blob(container, blob.name, start_range: offset, end_range: length-1)[1] @logger.debug("learned tail: #{@tail}") end rescue Exception => e @logger.info("learn json one of the attempts failed #{e.message}") end end end rescue Exception => e @logger.info("learn json header and footer failed because #{e.message}") end end
list all blobs in the blobstore, set the offsets from the registry and return the filelist inspired by: github.com/Azure-Samples/storage-blobs-ruby-quickstart/blob/master/example.rb
# File lib/logstash/inputs/azure_blob_storage.rb, line 441 def list_blobs(fill) tries ||= 3 begin return try_list_blobs(fill) rescue Exception => e @logger.error("caught: #{e.message} for list_blobs retries left #{tries}") if (tries -= 1) > 0 retry end end end
# File lib/logstash/inputs/azure_blob_storage.rb, line 389 def nsgflowlog(queue, json, name) count=0 json["records"].each do |record| res = resource(record["resourceId"]) resource = { :subscription => res[:subscription], :resourcegroup => res[:resourcegroup], :nsg => res[:nsg] } @logger.trace(resource.to_s) record["properties"]["flows"].each do |flows| rule = resource.merge ({ :rule => flows["rule"]}) flows["flows"].each do |flowx| flowx["flowTuples"].each do |tup| tups = tup.split(',') ev = rule.merge({:unixtimestamp => tups[0], :src_ip => tups[1], :dst_ip => tups[2], :src_port => tups[3], :dst_port => tups[4], :protocol => tups[5], :direction => tups[6], :decision => tups[7]}) if (record["properties"]["Version"]==2) tups[9] = 0 if tups[9].nil? tups[10] = 0 if tups[10].nil? tups[11] = 0 if tups[11].nil? tups[12] = 0 if tups[12].nil? ev.merge!( {:flowstate => tups[8], :src_pack => tups[9], :src_bytes => tups[10], :dst_pack => tups[11], :dst_bytes => tups[12]} ) end @logger.trace(ev.to_s) if @addfilename ev.merge!( {:filename => name } ) end event = LogStash::Event.new('message' => ev.to_json) decorate(event) queue << event count+=1 end end end end return count end
# File lib/logstash/inputs/azure_blob_storage.rb, line 369 def partial_read_json(filename, offset, length) content = @blob_client.get_blob(container, filename, start_range: offset-@tail.length, end_range: length-1)[1] if content.end_with?(@tail) # the tail is part of the last block, so included in the total length of the get_blob return @head + strip_comma(content) else # when the file has grown between list_blobs and the time of partial reading, the tail will be wrong return @head + strip_comma(content[0...-@tail.length]) + @tail end end
# File lib/logstash/inputs/azure_blob_storage.rb, line 546 def resource(str) temp = str.split('/') date = '---' unless temp[9].nil? date = val(temp[9])+'/'+val(temp[10])+'/'+val(temp[11])+'-'+val(temp[12])+':00' end return {:subscription=> temp[2], :resourcegroup=>temp[4], :nsg=>temp[8], :date=>date} end
When events were processed after the last registry save, start a thread to update the registry file.
# File lib/logstash/inputs/azure_blob_storage.rb, line 488 def save_registry(filelist) # Because of threading, processed values and regsaved are not thread safe, they can change as instance variable @! Most of the time this is fine because the registry is the last resort, but be careful about corner cases! unless @processed == @regsaved @regsaved = @processed unless (@busy_writing_registry) Thread.new { begin @busy_writing_registry = true unless (@registry_local_path) @blob_client.create_block_blob(container, registry_path, Marshal.dump(filelist)) @logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to remote registry #{registry_path}") else File.open(@registry_local_path+"/"+@pipe_id, 'w') { |file| file.write(Marshal.dump(filelist)) } @logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to local registry #{registry_local_path+"/"+@pipe_id}") end @busy_writing_registry = false @last = Time.now.to_i rescue @logger.error("Oh my, registry write failed, do you have write access?") end } else @logger.info("Skipped writing the registry because previous write still in progress, it just takes long or may be hanging!") end end end
# File lib/logstash/inputs/azure_blob_storage.rb, line 380 def strip_comma(str) # when skipping over the first blocks the json will start with a comma that needs to be stripped. there should not be a trailing comma, but it gets stripped too if str.start_with?(',') str[0] = '' end str.nil? ? nil : str.chomp(",") end
# File lib/logstash/inputs/azure_blob_storage.rb, line 453 def try_list_blobs(fill) # inspired by: http://blog.mirthlab.com/2012/05/25/cleanly-retrying-blocks-of-code-after-an-exception-in-ruby/ chrono = Time.now.to_i files = Hash.new nextMarker = nil counter = 1 loop do blobs = @blob_client.list_blobs(container, { marker: nextMarker, prefix: @prefix}) blobs.each do |blob| # FNM_PATHNAME is required so that "**/test" can match "test" at the root folder # FNM_EXTGLOB allows you to use "test{a,b,c}" to match either "testa", "testb" or "testc" (closer to shell behavior) unless blob.name == registry_path if @path_filters.any? {|path| File.fnmatch?(path, blob.name, File::FNM_PATHNAME | File::FNM_EXTGLOB)} length = blob.properties[:content_length].to_i offset = 0 if fill offset = length end files.store(blob.name, { :offset => offset, :length => length }) if (@debug_until > @processed) then @logger.info("1: list_blobs #{blob.name} #{offset} #{length}") end end end end nextMarker = blobs.continuation_token break unless nextMarker && !nextMarker.empty? if (counter % 10 == 0) then @logger.info(" listing #{counter * 50000} files") end counter+=1 end if @debug_timer @logger.info("list_blobs took #{Time.now.to_i - chrono} sec") end return files end
# File lib/logstash/inputs/azure_blob_storage.rb, line 555 def val(str) return str.split('=')[1] end
# File lib/logstash/inputs/azure_blob_storage.rb, line 423 def wadiislog(lines) count=0 lines.each do |line| unless line.start_with?('#') queue << LogStash::Event.new('message' => ev.to_json) count+=1 end end return count # date { # match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss" ] # target => "@timestamp" # remove_field => ["log_timestamp"] # } end