class LogStash::Inputs::ZeroMQ
Read events over a 0MQ SUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this input plugin.
The default settings will create a subscriber binding to `tcp://127.0.0.1:2120` waiting for connecting publishers.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 133 def close begin error_check(@zsocket.close, "while closing the zmq socket") context.terminate rescue RuntimeError => e @logger.error("Failed to properly teardown ZeroMQ") end end
init_socket()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 97 def init_socket case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PULL when "pubsub" zmq_const = ZMQ::SUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end if @topology == "pubsub" if @topic.nil? @logger.debug("ZMQ - No topic provided. Subscribing to all messages") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""), "while setting ZMQ::SUBSCRIBE") else @topic.each do |t| @logger.debug("ZMQ subscribing to topic: #{t}") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t), "while setting ZMQ::SUBSCRIBE == #{t}") end end end end
register()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 89 def register require "ffi-rzmq" require "logstash/plugin_mixins/zeromq" self.class.send(:include, LogStash::PluginMixins::ZeroMQ) @host = Socket.gethostname init_socket end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 146 def run(output_queue) begin while !stop? handle_message(output_queue) end rescue => e @logger.debug? && @logger.debug("ZMQ Error", :subscriber => @zsocket, :exception => e) retry end # begin end
server?()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 142 def server? @mode == "server" end
Private Instance Methods
build_source_string()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 159 def build_source_string id = @address.first.clone end
handle_message(output_queue)
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 163 def handle_message(output_queue) # Here's the unified receiver more = true parts = [] rc = @zsocket.recv_strings(parts) error_check(rc, "in recv_strings", true) return unless ZMQ::Util.resultcode_ok?(rc) if @topology == "pubsub" && parts.length > 1 # assume topic is a simple string topic, *parts = parts else topic = nil end parts.each do |msg| @codec.decode(msg) do |event| event.set("host", event.get("host") || @host) event.set(@topic_field, topic.force_encoding('UTF-8')) unless topic.nil? decorate(event) output_queue << event end end end