class Fluent::Test::InputTestDriver

Attributes

emit_streams[R]
event_streams[R]
expected_emits_length[RW]
run_timeout[RW]

Public Class Methods

new(klass, &block) click to toggle source
Calls superclass method Fluent::Test::TestDriver::new
# File lib/fluent/test/input_test.rb, line 24
def initialize(klass, &block)
  super(klass, &block)
  @emit_streams = []
  @event_streams = []
  @expects = nil
  # for checking only the number of emitted records during run
  @expected_emits_length = nil
  @run_timeout = 5
  @run_post_conditions = []
end

Public Instance Methods

emits() click to toggle source
# File lib/fluent/test/input_test.rb, line 48
def emits
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << [tag, time, record]
    }
  }
  all
end
events() click to toggle source
# File lib/fluent/test/input_test.rb, line 58
def events
  all = []
  @emit_streams.each {|tag,events|
    all.concat events
  }
  all
end
expect_emit(tag, time, record) click to toggle source
# File lib/fluent/test/input_test.rb, line 35
def expect_emit(tag, time, record)
  (@expects ||= []) << [tag, time, record]
  self
end
expected_emits() click to toggle source
# File lib/fluent/test/input_test.rb, line 40
def expected_emits
  @expects ||= []
end
records() click to toggle source
# File lib/fluent/test/input_test.rb, line 66
def records
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << record
    }
  }
  all
end
register_run_breaking_condition(&block) click to toggle source
# File lib/fluent/test/input_test.rb, line 82
def register_run_breaking_condition(&block)
  if block
    @run_breaking_conditions ||= []
    @run_breaking_conditions << block
  end
end
register_run_post_condition(&block) click to toggle source
# File lib/fluent/test/input_test.rb, line 76
def register_run_post_condition(&block)
  if block
    @run_post_conditions << block
  end
end
run(num_waits = 10, &block) click to toggle source
Calls superclass method Fluent::Test::TestDriver#run
# File lib/fluent/test/input_test.rb, line 112
def run(num_waits = 10, &block)
  m = method(:emit_stream)
  unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper)
    Engine.singleton_class.prepend EmitStreamWrapper
  end
  Engine.emit_stream_callee = m
  unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper)
    instance.router.singleton_class.prepend EmitStreamWrapper
  end
  instance.router.emit_stream_callee = m

  super(num_waits) {
    block.call if block

    if @expected_emits_length || @expects || @run_post_conditions
      # counters for emits and emit_streams
      i, j = 0, 0

      # Events of expected length will be emitted at the end.
      max_length = @expected_emits_length
      max_length ||= @expects.length if @expects
      if max_length
        register_run_post_condition do
          i == max_length
        end
      end

      # Set running timeout to avoid infinite loop caused by some errors.
      started_at = Time.now
      register_run_breaking_condition do
        Time.now >= started_at + @run_timeout
      end

      until run_should_stop?
        if j >= @emit_streams.length
          sleep 0.01
          next
        end

        tag, events = @emit_streams[j]
        events.each do |time, record|
          if @expects
            assert_equal(@expects[i], [tag, time, record])
            assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime)
          end
          i += 1
        end
        j += 1
      end
      assert_equal(@expects.length, i) if @expects
    end
  }
  self
end
run_should_stop?() click to toggle source
# File lib/fluent/test/input_test.rb, line 89
def run_should_stop?
  # Should stop running if post conditions are not registered.
  return true unless @run_post_conditions

  # Should stop running if all of the post conditions are true.
  return true if @run_post_conditions.all? {|proc| proc.call }

  # Should stop running if any of the breaking conditions is true.
  # In this case, some post conditions may be not true.
  return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call }

  false
end

Private Instance Methods

emit_stream(tag, es) click to toggle source
# File lib/fluent/test/input_test.rb, line 168
def emit_stream(tag, es)
  @event_streams << es
  @emit_streams << [tag, es.to_a]
end