class Cabin::Outputs::ZeroMQ

Output to a zeromq socket.

Constants

CONTEXT
DEFAULTS

Attributes

socket[R]
topic[R]
topology[R]

Public Class Methods

new(addresses, options={}) click to toggle source

Create a new ZeroMQ output.

arguments: addresses A list of addresses to connect to. These are round-robined by zeromq.

:topology Either ‘pushpull’ or ‘pubsub’. Specifies which zeromq socket type to use. Default pushpull. :hwm Specifies the High Water Mark for the socket. Default 0, which means there is none. :linger Specifies the linger time in milliseconds for the socket. Default -1, meaning wait forever for the socket to close. :topic Specifies the topic for a pubsub topology. This can be a string or a proc with the event as the only argument.

# File lib/cabin/outputs/zeromq.rb, line 26
def initialize(addresses, options={})
  options = DEFAULTS.merge(options)

  @topology = options[:topology].to_s
  case @topology
  when "pushpull"
    socket_type = ZMQ::PUSH
  when "pubsub"
    socket_type = ZMQ::PUB
  end

  @topic = options[:topic]
  @socket = CONTEXT.socket(socket_type)
  
  Array(addresses).each do |address|
    error_check @socket.connect(address), "connecting to #{address}"
  end

  error_check @socket.setsockopt(ZMQ::LINGER, options[:linger]), "while setting ZMQ::LINGER to #{options[:linger]}"
  error_check @socket.setsockopt(ZMQ::HWM, options[:hwm]), "while setting ZMQ::HWM to #{options[:hwm]}"

  #TODO use cabin's teardown when it exists
  at_exit do
    teardown
  end

  #define_finalizer
end

Public Instance Methods

<<(event) click to toggle source
# File lib/cabin/outputs/zeromq.rb, line 67
def <<(event)
  if @socket.name == "PUB"
    topic = @topic.is_a?(Proc) ? @topic.call(event) : @topic
    error_check @socket.send_string(topic, ZMQ::SNDMORE), "in topic send_string"
  end
  error_check @socket.send_string(event.inspect), "in send_string"
end
hwm() click to toggle source
# File lib/cabin/outputs/zeromq.rb, line 61
def hwm
  array = []
  error_check @socket.getsockopt(ZMQ::HWM, array), "while getting ZMQ::HWM"
  array.first
end
linger() click to toggle source
# File lib/cabin/outputs/zeromq.rb, line 55
def linger
  array = []
  error_check @socket.getsockopt(ZMQ::LINGER, array), "while getting ZMQ::LINGER"
  array.first
end
teardown() click to toggle source
# File lib/cabin/outputs/zeromq.rb, line 75
def teardown
  @socket.close if @socket
end

Private Instance Methods

error_check(rc, doing) click to toggle source
# File lib/cabin/outputs/zeromq.rb, line 80
def error_check(rc, doing)
  unless ZMQ::Util.resultcode_ok?(rc)
    raise "ZeroMQ Error while #{doing}"
  end
end