class LogStash::Outputs::AzureEventHubs
Output plugin to send events to an Azure Event Hub.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 125 def close if (!@eventhub_client.nil?) @eventhub_client.closeSync(); end @executor_service.shutdown(); end
receive(event)
click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 133 def receive(event) begin @codec.encode(event) rescue => e @logger.warn("Error encoding event", :exception => e, :event => event) end end
register()
click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 66 def register # The Executor handles all the asynchronous tasks and this is passed to the EventHubClient. # The gives the user control to segregate their thread pool based on the work load. # This pool can then be shared across multiple EventHubClient instances. @executor_service = Executors.newScheduledThreadPool(@client_threads) # Handle Transient errors when creating the Event Hubs Client try = 0 retry_interval = 2 begin # Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive. # It is always a best practice to reuse these instances. if !@connection_string.nil? and @connection_string !~ /\A\s*\Z/ @eventhub_client = EventHubClient.createSync(@connection_string, @executor_service) @codec.on_event(&method(:send_record)) else @logger.warn("Connection String is empty, azure_event_hub output will be ignored and should not be called...") @codec.on_event(&method(:log_no_msg_sent)) end rescue IllegalConnectionStringFormatException, IOException => e @logger.error( "Unable to establish connection to Azure Event Hubs.", :error_message => e.getMessage(), :class => e.class.name ) close() exit(1) rescue EventHubException, ExecutionException => e # Log error, no retry if (e.is_a?(EventHubException) and e.getIsTransient() != true) or try >= @connection_retry_count @logger.error( "Unable to establish connection to Azure Event Hubs.", :error_message => e.getMessage(), :class => e.class.name ) close() exit(1) end # Log error with retry @logger.error( "Connection to Event Hubs failed, will attempt connection again.", :error_message => e.getMessage(), :class => e.class.name, :retry_in_seconds => retry_interval ) # Wait for interval sleep(retry_interval) # Add attempt and retry try += 1 retry end end
Private Instance Methods
log_no_msg_sent(event, payload)
click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 162 def log_no_msg_sent(event, payload) begin @logger.warn("Event ignored, connection string not set") end end
send_record(event, payload)
click to toggle source
# File lib/logstash/outputs/azure_event_hub.rb, line 142 def send_record(event, payload) begin # Create EventData object and convert payload to bytes eh_event = EventData.create(ByteBuffer::wrap(payload.to_java_bytes)) # Add property bag if (!@properties_bag.nil?) @properties_bag.each do |key, value| eh_event.getProperties().put(event.sprintf(key).to_java_string, event.sprintf(value).to_java_string) end end # Send using client @eventhub_client.sendSync(eh_event) rescue => e @logger.warn("Error sending event", :exception => e, :event => event) end end