class LogStash::Outputs::AzureEventHubs

Output plugin to send events to an Azure Event Hub.

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 125
def close
  if (!@eventhub_client.nil?)
    @eventhub_client.closeSync();
  end
  @executor_service.shutdown();
end
receive(event) click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 133
def receive(event)
  begin
    @codec.encode(event)
  rescue => e
    @logger.warn("Error encoding event", :exception => e, :event => event)
  end
end
register() click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 66
def register

  # The Executor handles all the asynchronous tasks and this is passed to the EventHubClient.
  # The gives the user control to segregate their thread pool based on the work load.
  # This pool can then be shared across multiple EventHubClient instances.
  @executor_service = Executors.newScheduledThreadPool(@client_threads)

  # Handle Transient errors when creating the Event Hubs Client
  try = 0
  retry_interval = 2
  begin
    # Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
    # It is always a best practice to reuse these instances.
    if !@connection_string.nil? and @connection_string !~ /\A\s*\Z/
      @eventhub_client = EventHubClient.createSync(@connection_string, @executor_service)
      @codec.on_event(&method(:send_record))
    else
      @logger.warn("Connection String is empty, azure_event_hub output will be ignored and should not be called...")
      @codec.on_event(&method(:log_no_msg_sent))
    end

  rescue IllegalConnectionStringFormatException, IOException => e
    @logger.error(
      "Unable to establish connection to Azure Event Hubs.",
      :error_message => e.getMessage(),
      :class => e.class.name
      )
    close()
    exit(1)
  rescue EventHubException, ExecutionException => e
    # Log error, no retry
    if (e.is_a?(EventHubException) and e.getIsTransient() != true) or try >= @connection_retry_count
      @logger.error(
        "Unable to establish connection to Azure Event Hubs.",
        :error_message => e.getMessage(),
        :class => e.class.name
        )
      close()
      exit(1)
    end

    # Log error with retry
    @logger.error(
      "Connection to Event Hubs failed, will attempt connection again.",
      :error_message => e.getMessage(),
      :class => e.class.name,
      :retry_in_seconds => retry_interval
    )

    # Wait for interval
    sleep(retry_interval)
    
    # Add attempt and retry
    try += 1
    retry
  end
end

Private Instance Methods

log_no_msg_sent(event, payload) click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 162
def log_no_msg_sent(event, payload)
  begin
    @logger.warn("Event ignored, connection string not set")
  end
end
send_record(event, payload) click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 142
def send_record(event, payload)
  begin
    # Create EventData object and convert payload to bytes
    eh_event = EventData.create(ByteBuffer::wrap(payload.to_java_bytes))
    
    # Add property bag
    if (!@properties_bag.nil?)
      @properties_bag.each do |key, value|
        eh_event.getProperties().put(event.sprintf(key).to_java_string, event.sprintf(value).to_java_string)
      end
    end

    # Send using client
    @eventhub_client.sendSync(eh_event)
  rescue => e
    @logger.warn("Error sending event", :exception => e, :event => event)
  end
end