class LogStash::Inputs::LogstashInputAzureblob

Logstash input plugin for Azure Blobs

This logstash plugin gathers data from Microsoft Azure Blobs

Constants

MAX

Constant of max integer

Public Instance Methods

acquire_lease(blob_name, retry_times = 30, interval_sec = 1) click to toggle source

Acquire a lease on a blob item with retries.

By default, it will retry 30 times with 1 second interval.

# File lib/logstash/inputs/azureblob.rb, line 228
def acquire_lease(blob_name, retry_times = 30, interval_sec = 1)
  lease = nil;
  retried = 0;
  while lease.nil? do
    begin
      lease = @azure_blob.acquire_blob_lease(@container, blob_name, {:timeout => 10})
    rescue StandardError => e
      if(e.type == 'LeaseAlreadyPresent')
          if (retried > retry_times)
              raise
          end
          retried += 1
          sleep interval_sec
      end
    end
  end #while
  return lease
end
cleanup_registry() click to toggle source

Clean up the registry.

# File lib/logstash/inputs/azureblob.rb, line 334
def cleanup_registry
  begin
    lease = nil
    lease = acquire_lease(@registry_locker)
    registry_hash = load_registry
    registry_hash.each { | key, registry_item|
      registry_item.reader = nil if registry_item.reader == @reader
    }
    save_registry(registry_hash)
    @azure_blob.release_blob_lease(@container, @registry_locker, lease)
    lease = nil
  rescue StandardError => e
    @logger.error("Oh My, An error occurred. #{e}:\n#{e.backtrace}", :exception => e)
  ensure
    @azure_blob.release_blob_lease(@container, @registry_locker, lease) unless lease.nil?
    lease = nil
  end #rescue
end
create_registry(blob_items) click to toggle source

Create a registry file to coordinate between multiple azure blob inputs.

# File lib/logstash/inputs/azureblob.rb, line 354
def create_registry (blob_items)
  registry_hash = Hash.new

  blob_items.each do |blob_item|
      initial_offset = 0
      initial_offset = blob_item.properties[:content_length] if @registry_create_policy == 'resume'
      registry_item = LogStash::Inputs::RegistryItem.new(blob_item.name, blob_item.properties[:etag], nil, initial_offset, 0)
    registry_hash[blob_item.name] = registry_item
  end # each
  save_registry(registry_hash)
  return registry_hash
end
deserialize_registry_hash(json_string) click to toggle source

Deserialize registry hash from json string.

# File lib/logstash/inputs/azureblob.rb, line 176
def deserialize_registry_hash (json_string)
  result = Hash.new
  temp_hash = JSON.parse(json_string)
  temp_hash.values.each { |kvp|
    result[kvp['file_path']] = LogStash::Inputs::RegistryItem.new(kvp['file_path'], kvp['etag'], kvp['reader'], kvp['offset'], kvp['gen'])
  }
  return result
end
list_all_blobs() click to toggle source

List all the blobs in the given container.

# File lib/logstash/inputs/azureblob.rb, line 186
def list_all_blobs
  blobs = Set.new []
  continuation_token = NIL
  @blob_list_page_size = 100 if @blob_list_page_size <= 0
  loop do
    # Need to limit the returned number of the returned entries to avoid out of memory exception.
    entries = @azure_blob.list_blobs(@container, { :timeout => 10, :marker => continuation_token, :max_results => @blob_list_page_size })
    entries.each do |entry|
      blobs << entry
    end # each
    continuation_token = entries.continuation_token
    break if continuation_token.empty?
  end # loop
  return blobs
end
load_registry() click to toggle source

Load the content of the registry into the registry hash and return it.

# File lib/logstash/inputs/azureblob.rb, line 368
def load_registry
  # Get content
  registry_blob, registry_blob_body = @azure_blob.get_blob(@container, @registry_path)
  registry_hash = deserialize_registry_hash(registry_blob_body)
  return registry_hash
end
process(queue) click to toggle source

Start processing the next item.

# File lib/logstash/inputs/azureblob.rb, line 124
def process(queue)
  begin
    blob, start_index, gen = register_for_read

    if(!blob.nil?)
      begin
        blob_name = blob.name
        # Work-around: After returned by get_blob, the etag will contains quotes.
        new_etag = blob.properties[:etag]
        # ~ Work-around

        blob, header = @azure_blob.get_blob(@container, blob_name, {:end_range => @file_head_bytes}) if header.nil? unless @file_head_bytes.nil? or @file_head_bytes <= 0

        if start_index == 0
          # Skip the header since it is already read.
          start_index = start_index + @file_head_bytes
        else
          # Adjust the offset when it is other than first time, then read till the end of the file, including the tail.
          start_index = start_index - @file_tail_bytes
          start_index = 0 if start_index < 0
        end

        blob, content = @azure_blob.get_blob(@container, blob_name, {:start_range => start_index} )
        
        # content will be used to calculate the new offset. Create a new variable for processed content.
        processed_content = content
        if(!@record_preprocess_reg_exp.nil?)
          reg_exp = Regexp.new(@record_preprocess_reg_exp, Regexp::MULTILINE)
          processed_content = content.sub(reg_exp, '')
        end

        # Putting header and content and tail together before pushing into event queue
        processed_content = "#{header}#{processed_content}" unless header.nil? || header.length == 0
                            
        @codec.decode(processed_content) do |event|
          decorate(event)
          queue << event
        end # decode
      ensure
        # Making sure the reader is removed from the registry even when there's exception.
        new_offset = start_index
        new_offset = new_offset + content.length unless content.nil?
        new_registry_item = LogStash::Inputs::RegistryItem.new(blob_name, new_etag, nil, new_offset, gen)
        update_registry(new_registry_item)
      end # begin
    end # if
  rescue StandardError => e
    @logger.error("Oh My, An error occurred. \nError:#{e}:\nTrace:\n#{e.backtrace}", :exception => e)
  end # begin
end
raise_gen(registry_hash, file_path) click to toggle source

Raise generation for blob in registry

# File lib/logstash/inputs/azureblob.rb, line 203
def raise_gen(registry_hash, file_path)
  begin
    target_item = registry_hash[file_path]
    begin
      target_item.gen += 1
      # Protect gen from overflow.
      target_item.gen = target_item.gen / 2 if target_item.gen == MAX
    rescue StandardError => e
      @logger.error("Fail to get the next generation for target item #{target_item}.", :exception => e)
      target_item.gen = 0
    end

    min_gen_item = registry_hash.values.min_by { |x| x.gen }
    while min_gen_item.gen > 0
      registry_hash.values.each { |value| 
        value.gen -= 1
      }
      min_gen_item = registry_hash.values.min_by { |x| x.gen }
    end
  end
end
register() click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 98
def register
  # this is the reader # for this specific instance.
  @reader = SecureRandom.uuid
  @registry_locker = "#{@registry_path}.lock"
 
  # Setup a specific instance of an Azure::Storage::Client
  client = Azure::Storage::Client.create(:storage_account_name => @storage_account_name, :storage_access_key => @storage_access_key, :storage_blob_host => "https://#{@storage_account_name}.blob.#{@endpoint}")
  # Get an azure storage blob service object from a specific instance of an Azure::Storage::Client
  @azure_blob = client.blob_client
  # Add retry filter to the service object
  @azure_blob.with_filter(Azure::Storage::Core::Filter::ExponentialRetryPolicyFilter.new)
end
register_for_read() click to toggle source

Return the next blob for reading as well as the start index.

# File lib/logstash/inputs/azureblob.rb, line 248
def register_for_read
  begin
    all_blobs = list_all_blobs
    registry = all_blobs.find { |item| item.name.downcase == @registry_path  }
    registry_locker = all_blobs.find { |item| item.name.downcase == @registry_locker }

    candidate_blobs = all_blobs.select { |item| (item.name.downcase != @registry_path) && ( item.name.downcase != @registry_locker ) }
    
    start_index = 0
    gen = 0
    lease = nil

    # Put lease on locker file than the registy file to allow update of the registry as a workaround for Azure Storage Ruby SDK issue # 16.
    # Workaround: https://github.com/Azure/azure-storage-ruby/issues/16
    registry_locker = @azure_blob.create_block_blob(@container, @registry_locker, @reader) if registry_locker.nil?
    lease = acquire_lease(@registry_locker)
    # ~ Workaround

    if(registry.nil?)
      registry_hash = create_registry(candidate_blobs)
    else
      registry_hash = load_registry
    end #if
      
    picked_blobs = Set.new []
    # Pick up the next candidate
    picked_blob = nil
    candidate_blobs.each { |candidate_blob|
      registry_item = registry_hash[candidate_blob.name]

      # Appending items that doesn't exist in the hash table
      if registry_item.nil?
        registry_item = LogStash::Inputs::RegistryItem.new(candidate_blob.name, candidate_blob.properties[:etag], nil, 0, 0)
        registry_hash[candidate_blob.name] = registry_item
      end # if
      
      if ((registry_item.offset < candidate_blob.properties[:content_length]) && (registry_item.reader.nil? || registry_item.reader == @reader))
        picked_blobs << candidate_blob
      end
    }

    picked_blob = picked_blobs.min_by { |b| registry_hash[b.name].gen }
    if !picked_blob.nil?
      registry_item = registry_hash[picked_blob.name]
      registry_item.reader = @reader
      registry_hash[picked_blob.name] = registry_item
      start_index = registry_item.offset
      raise_gen(registry_hash, picked_blob.name)
      gen = registry_item.gen
    end #if

    # Save the chnage for the registry
    save_registry(registry_hash)
    
    @azure_blob.release_blob_lease(@container, @registry_locker, lease)
    lease = nil;

    return picked_blob, start_index, gen
  rescue StandardError => e
    @logger.error("Oh My, An error occurred. #{e}:\n#{e.backtrace}", :exception => e)
    return nil, nil, nil
  ensure
    @azure_blob.release_blob_lease(@container, @registry_locker, lease) unless lease.nil?
    lease = nil
  end # rescue
end
run(queue) click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 111
def run(queue)
  # we can abort the loop if stop? becomes true
  while !stop?
    process(queue)
    Stud.stoppable_sleep(@interval) { stop? }
  end # loop
end
save_registry(registry_hash) click to toggle source

Serialize the registry hash and save it.

# File lib/logstash/inputs/azureblob.rb, line 376
def save_registry(registry_hash)
  # Serialize hash to json
  registry_hash_json = JSON.generate(registry_hash)

  # Upload registry to blob
  @azure_blob.create_block_blob(@container, @registry_path, registry_hash_json)
end
stop() click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 119
def stop
  cleanup_registry
end
update_registry(registry_item) click to toggle source

Update the registry

# File lib/logstash/inputs/azureblob.rb, line 316
def update_registry (registry_item)
  begin
    lease = nil
    lease = acquire_lease(@registry_locker)
    registry_hash = load_registry
    registry_hash[registry_item.file_path] = registry_item
    save_registry(registry_hash)
    @azure_blob.release_blob_lease(@container, @registry_locker, lease)
    lease = nil
  rescue StandardError => e
    @logger.error("Oh My, An error occurred. #{e}:\n#{e.backtrace}", :exception => e)
  ensure
    @azure_blob.release_blob_lease(@container, @registry_locker, lease) unless lease.nil?
    lease = nil
  end #rescue
end