class Framed::Emitters::Buffered
Constants
- MAX_QUEUE_SIZE
- MAX_REQUEST_BATCH_SIZE
Public Class Methods
new(client)
click to toggle source
Calls superclass method
Framed::Emitters::Base::new
# File lib/framed/emitters.rb, line 69 def initialize(client) super @event_queue = Queue.new @batch_lock = Mutex.new @request_queue = Queue.new @request_pending = Mutex.new @request_thread = nil end
Public Instance Methods
enqueue(event)
click to toggle source
# File lib/framed/emitters.rb, line 123 def enqueue(event) queue_full = false @batch_lock.synchronize do # To avoid logging inside the lock (since loggers can block) # we remember if the queue is full and log outside the lock. queue_full = @event_queue.length >= MAX_QUEUE_SIZE unless queue_full @event_queue << event end # don't start a new request if one is already in progress. if @request_pending.locked? return end end warn_full if queue_full start_request end
start()
click to toggle source
# File lib/framed/emitters.rb, line 80 def start if @request_thread && !@request_thread.alive? Framed.log_info('Starting request thread due to dead thread') end @request_thread = Thread.new do loop do pending = @request_queue.pop @request_pending.synchronize do transmit(pending) end start_request end end end
stop(drain = false)
click to toggle source
# File lib/framed/emitters.rb, line 98 def stop(drain = false) if drain # start batch requests if needed while @event_queue.length > 0 start_request end # wait for pending requests if needed: while @request_queue.length > 0 sleep(0.1) end # and wait for the final request, if needed @request_pending.synchronize do stop_request_thread end else stop_request_thread end end
warn_full(event)
click to toggle source
# File lib/framed/emitters.rb, line 119 def warn_full(event) Framed.log_error("Queued #{event} to Framed, but queue is full. Dropping event.") end
Private Instance Methods
ensure_request_thread()
click to toggle source
# File lib/framed/emitters.rb, line 147 def ensure_request_thread return if @request_thread && @request_thread.alive? @request_pending.synchronize do start end end
start_request()
click to toggle source
# File lib/framed/emitters.rb, line 162 def start_request ensure_request_thread @batch_lock.synchronize do return if @event_queue.empty? pending = [] while pending.length < MAX_REQUEST_BATCH_SIZE && @event_queue.length > 0 pending << @event_queue.pop end @request_queue << pending end end
stop_request_thread()
click to toggle source
# File lib/framed/emitters.rb, line 155 def stop_request_thread if @request_thread @request_thread.kill @request_thread = nil end end
transmit(events)
click to toggle source
# File lib/framed/emitters.rb, line 177 def transmit(events) return unless events && events.length > 0 begin @client.track(events) rescue StandardError => exc Framed.log_error("#transmit failed: #{exc}") end end