class Fluent::WebSocketOutput

Public Instance Methods

buffer(data) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 124
def buffer(data)
  return unless @buffered_messages > 0
  @buffer << data
  # Buffer only new @buffered_messages messages
  @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 33
def configure(conf)
  super
  @thread = Thread.new do
    $log.trace "Started em-websocket thread."
    $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]"
    EM.run {
      EM::WebSocket.run(:host => @host, :port => @port) do |ws|
        ws.onopen { |handshake|
          $log.info "WebSocket opened #{{
            :path => handshake.path,
            :query => handshake.query,
            :origin => handshake.origin,
          }}"
          if doAuth(handshake.query)
            callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| sendMsg(handshake.query, ws, msg)}
            $lock.synchronize do
              sid = $channel.subscribe callback
              $log.trace "WebSocket connection: ID " + sid.to_s
              ws.onclose {
                $log.trace "Connection closed: ID " + sid.to_s
                $lock.synchronize do
                  $channel.unsubscribe(sid)
                end
              }
              @buffer.each do |msg|
                sendMsg(handshake.query, ws, msg)
              end
            end
          else
            ws.send("Unauthorized")
          end

          #ws.onmessage { |msg|
          #}
        }
      end
    }
  end
end
doAuth(query) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 73
def doAuth(query)
  if @token.nil? || ( query.key?("token") && @token == query["token"] )
    $log.trace "Auth OK"
    return true
  end

  $log.trace "Auth failed"
  return false
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 110
def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    data = [record]
    if (@add_time) then data.unshift(time) end
    if (@add_tag) then data.unshift(tag) end
    output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data )
    buffer(output)
    $lock.synchronize do
      $channel.push output
    end
  }
end
sendMsg(filters, ws, msg) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 83
def sendMsg(filters, ws, msg)
  parser = Yajl::Parser.new
  msgStruct = parser.parse(msg)
  return if msgStruct.length != 2
  msgContent = msgStruct[1]

  pass = 0

  filters.each do |key, value|
    pass += 1 if key == 'token' || ( msgContent.key?(key) && msgContent[key] == value )
  end

  ws.send(msg) if filters.length == pass
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 103
def shutdown
  super
  EM.stop
  Thread::kill(@thread)
  $log.trace "Killed em-websocket thread."
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 98
def start
  @buffer = []
  super
end