class LogStash::Filters::ZeroMQ
ZeroMQ
filter. This is the best way to send an event externally for filtering It works much like an exec filter would by sending the event “offsite” for processing and waiting for a response
The protocol here is:
* REQ sent with JSON-serialized logstash event * REP read expected to be the full JSON 'filtered' event * - if reply read is an empty string, it will cancel the event.
Note that this is a limited subset of the zeromq functionality in inputs and outputs. The only topology that makes sense here is: REQ/REP. bunde
Public Class Methods
new(params)
click to toggle source
Calls superclass method
# File lib/logstash/filters/zeromq.rb, line 69 def initialize(params) super(params) @threadsafe = false end
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/logstash/filters/zeromq.rb, line 84 def close close super() end
filter(event)
click to toggle source
# File lib/logstash/filters/zeromq.rb, line 177 def filter(event) begin # TODO (lusis): Allow filtering multiple fields if @field success, reply = send_recv(event.get(@field)) else success, reply = send_recv(event.to_json) end # If we receive an sentinel value as reply, this is an indication that the filter # wishes to cancel this event. if success && reply == @sentinel @logger.debug? && @logger.debug("0mq: recieved sentinel value, cancelling event.") event.cancel return end @logger.debug? && @logger.debug("0mq: receiving", :reply => reply) if @field event[@field] = event.sprintf(reply) filter_matched(event) else reply = LogStash::Event.new(LogStash::Json.load(reply)) event.overwrite(reply) end filter_matched(event) #if message send/recv was not successful add the timeout if not success tags = event.get("tags") || [] tags << @add_tag_on_timeout event.set("tags", tags) end rescue => e @logger.warn("0mq filter exception", :address => @address, :exception => e, :backtrace => e.backtrace) end end
register()
click to toggle source
# File lib/logstash/filters/zeromq.rb, line 76 def register require "ffi-rzmq" require "logstash/plugin_mixins/zeromq" self.class.send(:include, LogStash::PluginMixins::ZeroMQ) connect end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/filters/zeromq.rb, line 97 def connect @logger.debug("0mq: connecting socket") @zsocket = context.socket(ZMQ::REQ) error_check(@zsocket.setsockopt(ZMQ::LINGER, 0), "while setting ZMQ::LINGER == 0)") @poller = ZMQ::Poller.new @poller.register(@zsocket, ZMQ::POLLIN) if @sockopt #TODO: should make sure that ZMQ::LINGER and ZMQ::POLLIN are not changed setopts(@zsocket, @sockopt) end setup(@zsocket, @address) end
reconnect()
click to toggle source
# File lib/logstash/filters/zeromq.rb, line 114 def reconnect close connect end
send_recv(message)
click to toggle source
send and receive data. message is assumed to be json will return a boolean for success, and a string containing one of several things:
- empty string: response from server - updated string: response from server - original message: could not send request or get response from server in time
# File lib/logstash/filters/zeromq.rb, line 125 def send_recv(message) success = false @retries.times do @logger.debug? && @logger.debug("0mq: sending", :request => message) rc = @zsocket.send_string(message) if ZMQ::Util.resultcode_ok?(rc) success = true break else @logger.debug? && @logger.debug("0mq: error sending message (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'") reconnect end #if resultcode end #retries.times #if we did not succeed log it and fail here. if not success @logger.warn("0mq: error sending message (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'") return success, message end #now get reply reply = '' success = false @retries.times do @logger.debug? && @logger.debug("0mq: polling for reply for #{@timeout}ms.") #poll the socket. If > 0, something to read. If < 0, error. If zero, loop num_readable = @poller.poll(@timeout) if num_readable > 0 #something to read, do it. rc = @zsocket.recv_string(reply) @logger.debug? && @logger.debug("0mq: message received, checking error") error_check(rc, "in recv_string") success = true break elsif num_readable < 0 #error, reconnect close connect end end # @retries.times #if we maxed out on number of retries, then set reply to message so that #the event isn't cancelled. we want to carry on if the server is down. if not success @logger.warn("0mq: did not receive reply (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'") return success, message end return success, reply end
server?()
click to toggle source
# File lib/logstash/filters/zeromq.rb, line 213 def server? @mode == "server" end