class LogStash::Filters::ZeroMQ

ZeroMQ filter. This is the best way to send an event externally for filtering It works much like an exec filter would by sending the event “offsite” for processing and waiting for a response

The protocol here is:

* REQ sent with JSON-serialized logstash event
* REP read expected to be the full JSON 'filtered' event
* - if reply read is an empty string, it will cancel the event.

Note that this is a limited subset of the zeromq functionality in inputs and outputs. The only topology that makes sense here is: REQ/REP. bunde

Public Class Methods

new(params) click to toggle source
Calls superclass method
# File lib/logstash/filters/zeromq.rb, line 69
def initialize(params)
  super(params)

  @threadsafe = false
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/logstash/filters/zeromq.rb, line 84
def close
  close
  super()
end
filter(event) click to toggle source
# File lib/logstash/filters/zeromq.rb, line 177
def filter(event)
  begin
    # TODO (lusis): Allow filtering multiple fields
    if @field
      success, reply = send_recv(event.get(@field))
    else
      success, reply = send_recv(event.to_json)
    end
    # If we receive an sentinel value as reply, this is an indication that the filter
    # wishes to cancel this event.
    if success && reply == @sentinel
      @logger.debug? && @logger.debug("0mq: recieved sentinel value, cancelling event.")
      event.cancel
      return
    end
    @logger.debug? && @logger.debug("0mq: receiving", :reply => reply)
    if @field
      event[@field] = event.sprintf(reply)
      filter_matched(event)
    else
      reply = LogStash::Event.new(LogStash::Json.load(reply))
      event.overwrite(reply)
    end
    filter_matched(event)
    #if message send/recv was not successful add the timeout
    if not success
      tags = event.get("tags") || []
      tags << @add_tag_on_timeout
      event.set("tags", tags)
    end
  rescue => e
    @logger.warn("0mq filter exception", :address => @address, :exception => e, :backtrace => e.backtrace)
  end
end
register() click to toggle source
# File lib/logstash/filters/zeromq.rb, line 76
def register
  require "ffi-rzmq"
  require "logstash/plugin_mixins/zeromq"
  self.class.send(:include, LogStash::PluginMixins::ZeroMQ)
  connect
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/filters/zeromq.rb, line 97
def connect
  @logger.debug("0mq: connecting socket")
  @zsocket = context.socket(ZMQ::REQ)
  error_check(@zsocket.setsockopt(ZMQ::LINGER, 0),
              "while setting ZMQ::LINGER == 0)")
  @poller = ZMQ::Poller.new
  @poller.register(@zsocket, ZMQ::POLLIN)

  if @sockopt
    #TODO: should make sure that ZMQ::LINGER and ZMQ::POLLIN are not changed
    setopts(@zsocket, @sockopt)
  end

  setup(@zsocket, @address)
end
reconnect() click to toggle source
# File lib/logstash/filters/zeromq.rb, line 114
def reconnect
  close
  connect
end
send_recv(message) click to toggle source

send and receive data. message is assumed to be json will return a boolean for success, and a string containing one of several things:

- empty string: response from server
- updated string: response from server
- original message: could not send request or get response from server in time
# File lib/logstash/filters/zeromq.rb, line 125
def send_recv(message)
  success = false
  @retries.times do
    @logger.debug? && @logger.debug("0mq: sending", :request => message)
    rc = @zsocket.send_string(message)
    if ZMQ::Util.resultcode_ok?(rc)
      success = true
      break
    else
      @logger.debug? && @logger.debug("0mq: error sending message (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'")
      reconnect
    end #if resultcode
  end #retries.times

  #if we did not succeed log it and fail here.
  if not success
    @logger.warn("0mq: error sending message (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'")
    return success, message
  end

  #now get reply
  reply = ''
  success = false
  @retries.times do
    @logger.debug? && @logger.debug("0mq: polling for reply for #{@timeout}ms.")
    #poll the socket. If > 0, something to read. If < 0, error. If zero, loop
    num_readable = @poller.poll(@timeout)
    if num_readable > 0
      #something to read, do it.
      rc = @zsocket.recv_string(reply)
      @logger.debug? && @logger.debug("0mq: message received, checking error")
      error_check(rc, "in recv_string")
      success = true
      break
    elsif num_readable < 0
      #error, reconnect
      close
      connect
    end
  end # @retries.times

  #if we maxed out on number of retries, then set reply to message so that
  #the event isn't cancelled. we want to carry on if the server is down.
  if not success
    @logger.warn("0mq: did not receive reply (zmq_errno = #{ZMQ::Util.errno}, zmq_error_string = '#{ZMQ::Util.error_string}'")
    return success, message
  end

  return success, reply
end
server?() click to toggle source
# File lib/logstash/filters/zeromq.rb, line 213
def server?
  @mode == "server"
end