class Stackify::MsgsQueue

Constants

CHUNK_MIN_WEIGHT
DELAY_WAITING
ERROR_SIZE
LOG_SIZE

Attributes

worker[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/stackify/msgs_queue.rb, line 12
def initialize
  super(Stackify.configuration.queue_max_size)
  start_worker
end

Public Instance Methods

<<(msg)
Alias for: add_msg
add_msg(msg) click to toggle source
# File lib/stackify/msgs_queue.rb, line 43
def add_msg msg
  Stackify.internal_log :debug, "[MsgsQueue] add_msg() Is worker <#{@worker.name}> alive? = #{@worker.alive?}"
  if !@worker.alive?
    start_worker
    Stackify.internal_log :debug, "[MsgsQueue] add_msg() Newly created worker <#{@worker.name}>"
  end
  self.synchronize do
    case Stackify.configuration.transport
    when Stackify::DEFAULT
      Stackify::Utils.do_only_if_authorized_and_mode_is_on Stackify::MODES[:logging] do
        old_push(msg)
      end
    when Stackify::UNIX_SOCKET, Stackify::AGENT_HTTP
      old_push(msg)
    end
  end
end
Also aliased as: <<, push
old_push(msg)
Alias for: push
push(msg)
Also aliased as: old_push
Alias for: add_msg
push_remained_msgs() click to toggle source
# File lib/stackify/msgs_queue.rb, line 30
def push_remained_msgs
  Stackify.internal_log :debug, "[MsgsQueue] push_remained_msgs() alive? = #{@worker.alive?}"
  wait_until_all_workers_will_add_msgs
  self.synchronize do
    Stackify.internal_log :info, '[MsgsQueue] All remained logs are going to be sent'
    Stackify.shutdown_all
    if self.length > 0
      Stackify.get_transport.send_logs(pop_all)
      Stackify.status = Stackify::STATUSES[:terminated]
    end
  end
end
start_worker() click to toggle source
# File lib/stackify/msgs_queue.rb, line 19
def start_worker
  if Stackify::Utils.is_mode_on? Stackify::MODES[:logging]
    @send_interval = ScheduleDelay.new
    @worker = MsgsQueueWorker.new
    task = update_send_interval_task
    @worker.async_perform @send_interval, task
  else
    Stackify.internal_log :warn, '[MsgsQueue]: Logging is disabled at configuration!'
  end
end

Private Instance Methods

calculate_processed_msgs_count() click to toggle source
# File lib/stackify/msgs_queue.rb, line 97
def calculate_processed_msgs_count
  processed_count = 0
  keep_going = true
  begin
    count = push_one_chunk
    keep_going = count >= 50
    processed_count += count
  end while keep_going
  processed_count
end
pop_all() click to toggle source
# File lib/stackify/msgs_queue.rb, line 66
def pop_all
  self.synchronize do
    msgs = []
    until self.empty? do
      msgs << self.pop
    end
    msgs
  end
end
push_one_chunk() click to toggle source
# File lib/stackify/msgs_queue.rb, line 108
def push_one_chunk
  chunk_weight = 0
  chunk = []
  started_at = Time.now.to_f * 1000
  self.synchronize do
    while(true)
      if length > 0
        msg = pop
        chunk << msg
        chunk_weight += Stackify.get_transport.has_error(msg) ? ERROR_SIZE : LOG_SIZE
        break if Stackify.get_transport.get_epoch(msg) > started_at || CHUNK_MIN_WEIGHT > 50
      else
        break
      end
    end
    Stackify.get_transport.send_logs(chunk) if chunk.length > 0
    chunk_weight
  end
end
update_send_interval_task() click to toggle source
# File lib/stackify/msgs_queue.rb, line 84
def update_send_interval_task
properties = {
    success_condition: lambda do |result|
      true
    end
  }
  Stackify::ScheduleTask.new properties do
    processed_count = calculate_processed_msgs_count
    i = @send_interval.update_by_sent_num! processed_count
    i
  end
end
wait_until_all_workers_will_add_msgs() click to toggle source
# File lib/stackify/msgs_queue.rb, line 76
def wait_until_all_workers_will_add_msgs
  @send_interval = 120
  while Stackify.alive_adding_msg_workers.size > 0
    @send_interval += DELAY_WAITING
    sleep DELAY_WAITING
  end
end