class Fluent::Plugin::CopyOutput

Attributes

ignore_errors[R]
ignore_if_prev_successes[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::MultiOutput::new
# File lib/fluent/plugin/out_copy.rb, line 32
def initialize
  super
  @ignore_errors = []
  @ignore_if_prev_successes = []
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::MultiOutput#configure
# File lib/fluent/plugin/out_copy.rb, line 38
def configure(conf)
  super

  @copy_proc = gen_copy_proc
  @stores.each_with_index { |store, i|
    if i == 0 && store.arg.include?('ignore_if_prev_success')
      raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives"
    end
    @ignore_errors << (store.arg.include?('ignore_error'))
    @ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success'))
  }
  if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && !@ignore_if_prev_successes.include?(true)
    log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?"
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 54
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 58
def process(tag, es)
  unless es.repeatable?
    m = Fluent::MultiEventStream.new
    es.each {|time,record|
      m.add(time, record)
    }
    es = m
  end
  success = Array.new(outputs.size)
  outputs.each_with_index do |output, i|
    begin
      if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
        log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i
      else
        output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
        success[i] = true
      end
    rescue => e
      if @ignore_errors[i]
        log.error "ignore emit error in #{output.plugin_id}", error: e
      else
        raise e
      end
    end
  end
end

Private Instance Methods

gen_copy_proc() click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 87
def gen_copy_proc
  @copy_mode = :shallow if @deep_copy

  case @copy_mode
  when :no_copy
    nil
  when :shallow
    Proc.new { |es| es.dup }
  when :deep
    Proc.new { |es|
      packer = Fluent::MessagePackFactory.msgpack_packer
      times = []
      records = []
      es.each { |time, record|
        times << time
        packer.pack(record)
      }
      Fluent::MessagePackFactory.msgpack_unpacker.feed_each(packer.full_pack) { |record|
        records << record
      }
      Fluent::MultiEventStream.new(times, records)
    }
  when :marshal
    Proc.new { |es|
      new_es = Fluent::MultiEventStream.new
      es.each { |time, record|
        new_es.add(time, Marshal.load(Marshal.dump(record)))
      }
      new_es
    }
  end
end