class LogStash::Inputs::Stomp
Creates events received with the STOMP protocol.
Attributes
client[RW]
Public Instance Methods
new_client()
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 70 def new_client OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value) end
register()
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 62 def register require "onstomp" @client = new_client @client.host = @vhost if @vhost @stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}" end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 97 def run(output_queue) # Handle disconnects @client.on_connection_closed { self.connect subscription_handler # is required for re-subscribing to the destination } connect @output_queue = output_queue subscription_handler end
stop()
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 109 def stop @client.disconnect if @client && @client.connected? end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 44 def connect begin @client.connect @logger.info("Connected to stomp server") if @client.connected? rescue OnStomp::ConnectFailedError, OnStomp::UnsupportedProtocolVersionError, Errno::ECONNREFUSED => e if @reconnect && !stop? @logger.warn("Failed to connect to stomp server. Retry in #{@reconnect_interval} seconds. #{e.inspect}") @logger.debug("#{e.backtrace.join("\n")}") if @debug sleep @reconnect_interval retry end @logger.warn("Failed to connect to stomp server. Exiting with error: #{e.inspect}") @logger.debug("#{e.backtrace.join("\n")}") if @debug end end
subscription_handler()
click to toggle source
# File lib/logstash/inputs/stomp.rb, line 75 def subscription_handler #Exit function when connection is not active return if !@client.connected? @client.subscribe(@destination) do |msg| @codec.decode(msg.body) do |event| decorate(event) @output_queue << event end end #In the event that there is only Stomp input plugin instances #the process ends prematurely. The above code runs, and return #the flow control to the 'run' method below. After that, the #method "run_input" from agent.rb marks 'done' as 'true' and calls #'finish' over the Stomp plugin instance. #'Sleeping' the plugin leaves the instance alive. until stop? sleep 1 end end