class LogStash::Inputs::Azureblob

Reads events from Azure Blobs

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azureblob.rb, line 22
def initialize(*args)
  super(*args)
end

Public Instance Methods

acquire_lock(blob_name) click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 49
def acquire_lock(blob_name)
  @azure_blob.create_page_blob(@container, blob_name, 512)
  @azure_blob.acquire_lease(@container, blob_name,{:duration=>60, :timeout=>10, :proposed_lease_id=>SecureRandom.uuid})
  return true
rescue LogStash::ShutdownSignal => e
  raise e
rescue => e
  @logger.error("Caught exception while locking", :exception => e)
  return false
end
list_blob_names() click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 35
def list_blob_names
  blob_names = Set.new []
  loop do
    continuation_token = NIL
    entries = @azure_blob.list_blobs(@container, { :timeout => 10, :marker => continuation_token})
    entries.each do |entry|
      blob_names << entry.name
    end
    continuation_token = entries.continuation_token
    break if continuation_token.empty?
  end
  return blob_names
end
lock_blob(blob_names) click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 60
def lock_blob(blob_names)
  real_blob_names = blob_names.select { |name| !name.end_with?(".lock") }
  real_blob_names.each do |blob_name|
    if !blob_names.include?(blob_name + ".lock")
      if acquire_lock(blob_name + ".lock")
        return blob_name
      end
    end
  end
  return NIL
end
process(output_queue) click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 72
def process(output_queue)
  blob_names = list_blob_names
  blob_name = lock_blob(blob_names)
  return if !blob_name
  blob, content = @azure_blob.get_blob(@container, blob_name)
  @codec.decode(content) do |event|
    output_queue << event
  end
rescue LogStash::ShutdownSignal => e
  raise e
rescue => e
  @logger.error("Oh My, An error occurred.", :exception => e)
end
register() click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 27
def register
  Azure.configure do |config|
    config.storage_account_name = @storage_account_name
    config.storage_access_key = @storage_access_key
  end
  @azure_blob = Azure::Blob::BlobService.new
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 87
def run(output_queue)
  while true
    process(output_queue)
  end # loop
end
teardown() click to toggle source
# File lib/logstash/inputs/azureblob.rb, line 94
def teardown
end