class Framed::Emitters::Buffered

Constants

MAX_QUEUE_SIZE
MAX_REQUEST_BATCH_SIZE

Public Class Methods

new(client) click to toggle source
Calls superclass method Framed::Emitters::Base::new
# File lib/framed/emitters.rb, line 69
def initialize(client)
  super
  @event_queue = Queue.new
  @batch_lock = Mutex.new

  @request_queue = Queue.new
  @request_pending = Mutex.new

  @request_thread = nil
end

Public Instance Methods

enqueue(event) click to toggle source
# File lib/framed/emitters.rb, line 123
def enqueue(event)
  queue_full = false
  @batch_lock.synchronize do
    # To avoid logging inside the lock (since loggers can block)
    #  we remember if the queue is full and log outside the lock.
    queue_full = @event_queue.length >= MAX_QUEUE_SIZE

    unless queue_full
      @event_queue << event
    end

    # don't start a new request if one is already in progress.
    if @request_pending.locked?
      return
    end
  end

  warn_full if queue_full

  start_request
end
start() click to toggle source
# File lib/framed/emitters.rb, line 80
def start
  if @request_thread && !@request_thread.alive?
    Framed.log_info('Starting request thread due to dead thread')
  end

  @request_thread = Thread.new do
    loop do
      pending = @request_queue.pop

      @request_pending.synchronize do
        transmit(pending)
      end

      start_request
    end
  end
end
stop(drain = false) click to toggle source
# File lib/framed/emitters.rb, line 98
def stop(drain = false)
  if drain
    # start batch requests if needed
    while @event_queue.length > 0
      start_request
    end

    # wait for pending requests if needed:
    while @request_queue.length > 0
      sleep(0.1)
    end

    # and wait for the final request, if needed
    @request_pending.synchronize do
      stop_request_thread
    end
  else
    stop_request_thread
  end
end
warn_full(event) click to toggle source
# File lib/framed/emitters.rb, line 119
def warn_full(event)
  Framed.log_error("Queued #{event} to Framed, but queue is full. Dropping event.")
end

Private Instance Methods

ensure_request_thread() click to toggle source
# File lib/framed/emitters.rb, line 147
def ensure_request_thread
  return if @request_thread && @request_thread.alive?

  @request_pending.synchronize do
    start
  end
end
start_request() click to toggle source
# File lib/framed/emitters.rb, line 162
def start_request
  ensure_request_thread

  @batch_lock.synchronize do
    return if @event_queue.empty?

    pending = []
    while pending.length < MAX_REQUEST_BATCH_SIZE && @event_queue.length > 0
      pending << @event_queue.pop
    end

    @request_queue << pending
  end
end
stop_request_thread() click to toggle source
# File lib/framed/emitters.rb, line 155
def stop_request_thread
  if @request_thread
    @request_thread.kill
    @request_thread = nil
  end
end
transmit(events) click to toggle source
# File lib/framed/emitters.rb, line 177
def transmit(events)
  return unless events && events.length > 0

  begin
    @client.track(events)
  rescue StandardError => exc
    Framed.log_error("#transmit failed: #{exc}")
  end
end