class Stackify::MsgsQueue
Constants
- CHUNK_MIN_WEIGHT
- DELAY_WAITING
- ERROR_SIZE
- LOG_SIZE
Attributes
worker[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/stackify/msgs_queue.rb, line 12 def initialize super(Stackify.configuration.queue_max_size) start_worker end
Public Instance Methods
add_msg(msg)
click to toggle source
# File lib/stackify/msgs_queue.rb, line 43 def add_msg msg Stackify.internal_log :debug, "[MsgsQueue] add_msg() Is worker <#{@worker.name}> alive? = #{@worker.alive?}" if !@worker.alive? start_worker Stackify.internal_log :debug, "[MsgsQueue] add_msg() Newly created worker <#{@worker.name}>" end self.synchronize do case Stackify.configuration.transport when Stackify::DEFAULT Stackify::Utils.do_only_if_authorized_and_mode_is_on Stackify::MODES[:logging] do old_push(msg) end when Stackify::UNIX_SOCKET, Stackify::AGENT_HTTP old_push(msg) end end end
push_remained_msgs()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 30 def push_remained_msgs Stackify.internal_log :debug, "[MsgsQueue] push_remained_msgs() alive? = #{@worker.alive?}" wait_until_all_workers_will_add_msgs self.synchronize do Stackify.internal_log :info, '[MsgsQueue] All remained logs are going to be sent' Stackify.shutdown_all if self.length > 0 Stackify.get_transport.send_logs(pop_all) Stackify.status = Stackify::STATUSES[:terminated] end end end
start_worker()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 19 def start_worker if Stackify::Utils.is_mode_on? Stackify::MODES[:logging] @send_interval = ScheduleDelay.new @worker = MsgsQueueWorker.new task = update_send_interval_task @worker.async_perform @send_interval, task else Stackify.internal_log :warn, '[MsgsQueue]: Logging is disabled at configuration!' end end
Private Instance Methods
calculate_processed_msgs_count()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 97 def calculate_processed_msgs_count processed_count = 0 keep_going = true begin count = push_one_chunk keep_going = count >= 50 processed_count += count end while keep_going processed_count end
pop_all()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 66 def pop_all self.synchronize do msgs = [] until self.empty? do msgs << self.pop end msgs end end
push_one_chunk()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 108 def push_one_chunk chunk_weight = 0 chunk = [] started_at = Time.now.to_f * 1000 self.synchronize do while(true) if length > 0 msg = pop chunk << msg chunk_weight += Stackify.get_transport.has_error(msg) ? ERROR_SIZE : LOG_SIZE break if Stackify.get_transport.get_epoch(msg) > started_at || CHUNK_MIN_WEIGHT > 50 else break end end Stackify.get_transport.send_logs(chunk) if chunk.length > 0 chunk_weight end end
update_send_interval_task()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 84 def update_send_interval_task properties = { success_condition: lambda do |result| true end } Stackify::ScheduleTask.new properties do processed_count = calculate_processed_msgs_count i = @send_interval.update_by_sent_num! processed_count i end end
wait_until_all_workers_will_add_msgs()
click to toggle source
# File lib/stackify/msgs_queue.rb, line 76 def wait_until_all_workers_will_add_msgs @send_interval = 120 while Stackify.alive_adding_msg_workers.size > 0 @send_interval += DELAY_WAITING sleep DELAY_WAITING end end