class LogStash::Codecs::RetriggerableTask
Constants
- SLEEP_FOR
Attributes
thread[R]
Public Class Methods
new(delay, listener)
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 8 def initialize(delay, listener) @count = calculate_count(delay) @listener = listener @counter = Concurrent::AtomicFixnum.new(0 + @count) @stopped = Concurrent::AtomicBoolean.new(false) @semaphore = Concurrent::Semaphore.new(1) end
Public Instance Methods
close()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 29 def close @stopped.make_true end
counter()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 33 def counter @counter.value end
executing?()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 37 def executing? running? && counter < 1 end
pending?()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 41 def pending? running? && counter > 0 end
retrigger()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 16 def retrigger return if stopped? if executing? @semaphore.acquire end if pending? reset_counter else start end end
Private Instance Methods
calculate_count(value)
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 47 def calculate_count(value) # in multiples of SLEEP_FOR (0.25) seconds # if delay is 10 seconds then count is 40 # this only works when SLEEP_FOR is less than 1 return 1 if value < SLEEP_FOR (value / SLEEP_FOR).floor end
reset_counter()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 55 def reset_counter @counter.value = 0 + @count end
running?()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 59 def running? @thread && @thread.alive? end
start()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 63 def start() reset_counter @thread = Thread.new do while counter > 0 break if stopped? sleep SLEEP_FOR @counter.decrement end @semaphore.drain_permits @listener.timeout if !stopped? @semaphore.release end end
stopped?()
click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 78 def stopped? @stopped.value end