class Fluent::ZmqSubInput

Attributes

subkeys[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zmq_sub.rb, line 14
def initialize
  super
  require 'ffi-rzmq'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zmq_sub.rb, line 19
def configure(conf)
  super
  @subkeys = @subkey.split(",")
end
run() click to toggle source
# File lib/fluent/plugin/in_zmq_sub.rb, line 37
def run
  begin
    @subscriber = @context.socket(ZMQ::SUB)
    @subscriber.connect(@publisher)
    if @subkeys.size > 0
      @subkeys.each do |k|
        @subscriber.setsockopt(ZMQ::SUBSCRIBE,k)
      end
    else
      @subscriber.setsockopt(ZMQ::SUBSCRIBE,'')
    end
    loop do
      msg = ''
      while @subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0
        begin
          (key, records) = msg.split(" ",2)
          records = MessagePack.unpack(records)
          if @bulk_send && records[0].class == Array
            es = MultiEventStream.new
            prev_tag = nil
            records.each do |tag, time, record|
              if prev_tag && prev_tag != tag
                Engine.emit_stream(prev_tag, es)
                es = MultiEventStream.new
              end
              es.add(time, record)
              prev_tag = tag
            end
            Engine.emit_stream(prev_tag, es) if es.to_a.size > 0
          else
            Engine.emit(*records)
          end
        rescue => e
          log.warn "Error in processing message.",:error_class => e.class, :error => e
          log.warn_backtrace
        end
        msg = ''
      end
      sleep(0.1)
    end
  rescue => e
    log.error "error occurred while executing plugin.", :error_class => e.class, :error => e
    log.warn_backtrace
  ensure
    if @subscriber
      @subscriber.close
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zmq_sub.rb, line 30
def shutdown
  Thread.kill(@thread)
  @thread.join
  @context.terminate
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_zmq_sub.rb, line 24
def start
  super
  @context =ZMQ::Context.new()
  @thread = Thread.new(&method(:run))
end