class Fluent::WebSocketOutput
Public Instance Methods
buffer(data)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 124 def buffer(data) return unless @buffered_messages > 0 @buffer << data # Buffer only new @buffered_messages messages @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 33 def configure(conf) super @thread = Thread.new do $log.trace "Started em-websocket thread." $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]" EM.run { EM::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { |handshake| $log.info "WebSocket opened #{{ :path => handshake.path, :query => handshake.query, :origin => handshake.origin, }}" if doAuth(handshake.query) callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| sendMsg(handshake.query, ws, msg)} $lock.synchronize do sid = $channel.subscribe callback $log.trace "WebSocket connection: ID " + sid.to_s ws.onclose { $log.trace "Connection closed: ID " + sid.to_s $lock.synchronize do $channel.unsubscribe(sid) end } @buffer.each do |msg| sendMsg(handshake.query, ws, msg) end end else ws.send("Unauthorized") end #ws.onmessage { |msg| #} } end } end end
doAuth(query)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 73 def doAuth(query) if @token.nil? || ( query.key?("token") && @token == query["token"] ) $log.trace "Auth OK" return true end $log.trace "Auth failed" return false end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 110 def emit(tag, es, chain) chain.next es.each {|time,record| data = [record] if (@add_time) then data.unshift(time) end if (@add_tag) then data.unshift(tag) end output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data ) buffer(output) $lock.synchronize do $channel.push output end } end
sendMsg(filters, ws, msg)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 83 def sendMsg(filters, ws, msg) parser = Yajl::Parser.new msgStruct = parser.parse(msg) return if msgStruct.length != 2 msgContent = msgStruct[1] pass = 0 filters.each do |key, value| pass += 1 if key == 'token' || ( msgContent.key?(key) && msgContent[key] == value ) end ws.send(msg) if filters.length == pass end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 103 def shutdown super EM.stop Thread::kill(@thread) $log.trace "Killed em-websocket thread." end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 98 def start @buffer = [] super end