class Cabin::Outputs::ZeroMQ
Output to a zeromq socket.
Constants
- CONTEXT
- DEFAULTS
Attributes
Public Class Methods
Source
# File lib/cabin/outputs/zeromq.rb, line 26 def initialize(addresses, options={}) options = DEFAULTS.merge(options) @topology = options[:topology].to_s case @topology when "pushpull" socket_type = ZMQ::PUSH when "pubsub" socket_type = ZMQ::PUB end @topic = options[:topic] @socket = CONTEXT.socket(socket_type) Array(addresses).each do |address| error_check @socket.connect(address), "connecting to #{address}" end error_check @socket.setsockopt(ZMQ::LINGER, options[:linger]), "while setting ZMQ::LINGER to #{options[:linger]}" error_check @socket.setsockopt(ZMQ::HWM, options[:hwm]), "while setting ZMQ::HWM to #{options[:hwm]}" #TODO use cabin's teardown when it exists at_exit do teardown end #define_finalizer end
Create a new ZeroMQ
output.
arguments: addresses A list of addresses to connect to. These are round-robined by zeromq.
:topology Either ‘pushpull’ or ‘pubsub’. Specifies which zeromq socket type to use. Default pushpull. :hwm Specifies the High Water Mark for the socket. Default 0, which means there is none. :linger Specifies the linger time in milliseconds for the socket. Default -1, meaning wait forever for the socket to close. :topic Specifies the topic for a pubsub topology. This can be a string or a proc with the event as the only argument.
Public Instance Methods
Source
# File lib/cabin/outputs/zeromq.rb, line 67 def <<(event) if @socket.name == "PUB" topic = @topic.is_a?(Proc) ? @topic.call(event) : @topic error_check @socket.send_string(topic, ZMQ::SNDMORE), "in topic send_string" end error_check @socket.send_string(event.inspect), "in send_string" end
Source
# File lib/cabin/outputs/zeromq.rb, line 61 def hwm array = [] error_check @socket.getsockopt(ZMQ::HWM, array), "while getting ZMQ::HWM" array.first end
Source
# File lib/cabin/outputs/zeromq.rb, line 55 def linger array = [] error_check @socket.getsockopt(ZMQ::LINGER, array), "while getting ZMQ::LINGER" array.first end
Source
# File lib/cabin/outputs/zeromq.rb, line 75 def teardown @socket.close if @socket end
Private Instance Methods
Source
# File lib/cabin/outputs/zeromq.rb, line 80 def error_check(rc, doing) unless ZMQ::Util.resultcode_ok?(rc) raise "ZeroMQ Error while #{doing}" end end