class LogStash::Codecs::RetriggerableTask

Constants

SLEEP_FOR

Attributes

thread[R]

Public Class Methods

new(delay, listener) click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 8
def initialize(delay, listener)
  @count = calculate_count(delay)
  @listener = listener
  @counter = Concurrent::AtomicFixnum.new(0 + @count)
  @stopped = Concurrent::AtomicBoolean.new(false)
  @semaphore = Concurrent::Semaphore.new(1)
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 29
def close
  @stopped.make_true
end
counter() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 33
def counter
  @counter.value
end
executing?() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 37
def executing?
  running? && counter < 1
end
pending?() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 41
def pending?
  running? && counter > 0
end
retrigger() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 16
def retrigger
  return if stopped?
  if executing?
    @semaphore.acquire
  end

  if pending?
    reset_counter
  else
    start
  end
end

Private Instance Methods

calculate_count(value) click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 47
def calculate_count(value)
  # in multiples of SLEEP_FOR (0.25) seconds
  # if delay is 10 seconds then count is 40
  # this only works when SLEEP_FOR is less than 1
  return 1 if value < SLEEP_FOR
  (value / SLEEP_FOR).floor
end
reset_counter() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 55
def reset_counter
  @counter.value = 0 + @count
end
running?() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 59
def running?
  @thread && @thread.alive?
end
start() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 63
def start()
  reset_counter
  @thread = Thread.new do
    while counter > 0
      break if stopped?
      sleep SLEEP_FOR
      @counter.decrement
    end

    @semaphore.drain_permits
    @listener.timeout if !stopped?
    @semaphore.release
  end
end
stopped?() click to toggle source
# File lib/logstash/codecs/retriggerable_task.rb, line 78
def stopped?
  @stopped.value
end