class Fluent::DummyDataProducerInput

Attributes

dummydata[R]

X: number dummydataX {“field1”:“data1”,“field2”:“data2”}

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dummydata_producer.rb, line 24
def configure(conf)
  super

  @increment_value = 0

  @dummydata = []
  re = /^dummydata(\d+)$/
  conf.keys.select{|key| key =~ re}.sort{|a,b| re.match(a)[1].to_i <=> re.match(b)[1].to_i}.each do |key|
    @dummydata.push(JSON.parse(conf[key]))
  end

  if @dummydata.size < 1
    raise Fluent::ConfigError, "no one dummydata exists"
  end
  @dummydata_index = 0
end
generate() click to toggle source
# File lib/fluent/plugin/in_dummydata_producer.rb, line 52
def generate
  d = @dummydata[@dummydata_index]
  unless d
    @dummydata_index = 0
    d = @dummydata[0]
  end
  @dummydata_index += 1
  d = d.dup
  if @auto_increment_key
    d[@auto_increment_key] = @increment_value
    @increment_value += 1
  end
  d
end
run() click to toggle source
# File lib/fluent/plugin/in_dummydata_producer.rb, line 67
def run
  batch_num = (@rate / 9).to_i + 1
  while @running
    current_time = Fluent::Engine.now
    rate_count = 0

    while @running && rate_count < @rate && Fluent::Engine.now == current_time
      batch_num.times do
        Fluent::Engine.emit(@tag, Fluent::Engine.now, generate())
      end
      rate_count += batch_num
      sleep 0.1
    end
    # wait for next second
    while @running && Fluent::Engine.now == current_time
      sleep 0.04
    end
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_dummydata_producer.rb, line 47
def shutdown
  @running = false
  @producer.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dummydata_producer.rb, line 41
def start
  super
  @running = true
  @producer = Thread.new(&method(:run))
end