class LogStash::Inputs::ZeroMQ

Read events over a 0MQ SUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this input plugin.

The default settings will create a subscriber binding to `tcp://127.0.0.1:2120` waiting for connecting publishers.

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 133
def close
  begin
    error_check(@zsocket.close, "while closing the zmq socket")
    context.terminate
  rescue RuntimeError => e
    @logger.error("Failed to properly teardown ZeroMQ")
  end
end
init_socket() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 97
def init_socket
  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR
  when "pushpull"
    zmq_const = ZMQ::PULL
  when "pubsub"
    zmq_const = ZMQ::SUB
  end # case socket_type
  @zsocket = context.socket(zmq_const)
  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  if @topology == "pubsub"
    if @topic.nil?
      @logger.debug("ZMQ - No topic provided. Subscribing to all messages")
      error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
    "while setting ZMQ::SUBSCRIBE")
    else
      @topic.each do |t|
        @logger.debug("ZMQ subscribing to topic: #{t}")
        error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
      "while setting ZMQ::SUBSCRIBE == #{t}")
      end
    end
  end
end
register() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 89
def register
  require "ffi-rzmq"
  require "logstash/plugin_mixins/zeromq"
  self.class.send(:include, LogStash::PluginMixins::ZeroMQ)
  @host = Socket.gethostname
  init_socket
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 146
def run(output_queue)
  begin
    while !stop?
      handle_message(output_queue)
    end
  rescue => e
    @logger.debug? && @logger.debug("ZMQ Error", :subscriber => @zsocket,
                  :exception => e)
    retry
  end # begin
end
server?() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 142
def server?
  @mode == "server"
end

Private Instance Methods

build_source_string() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 159
def build_source_string
  id = @address.first.clone
end
handle_message(output_queue) click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 163
def handle_message(output_queue)
  # Here's the unified receiver
  more = true
  parts = []
  rc = @zsocket.recv_strings(parts)
  error_check(rc, "in recv_strings", true)
  return unless ZMQ::Util.resultcode_ok?(rc)

  if @topology == "pubsub" && parts.length > 1
    # assume topic is a simple string
    topic, *parts = parts
  else
    topic = nil
  end
  parts.each do |msg|
    @codec.decode(msg) do |event|
      event.set("host", event.get("host") || @host)
      event.set(@topic_field, topic.force_encoding('UTF-8')) unless topic.nil?
      decorate(event)
      output_queue << event
    end
  end
end