class Fluent::SendmailInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sendmail.rb, line 13
def configure(conf)
  super
  @delivers = LruRedux::ThreadSafeCache.new(@lrucache_size)
  if @path_cache_file != nil
    if not File.exists?(@path_cache_file)
      File.open(@path_cache_file, "w+"){|cache_file|
        cache_file.puts('{}')
      }
    end
    if not File.readable?(@path_cache_file)
      raise ConfigError, "cache file exists but not readable."
    end
    if not File.writable?(@path_cache_file)
      raise Fluent::ConfigError, "cache file not writable."
    end
    File.open(@path_cache_file, "r") {|cache_file|
      line = cache_file.read()
      data = JSON.parse(line)
      data.each{|k, v|
        @delivers[k] = SendmailLog.new(v['time'], v['from_line'], v['nrcpts'])
      }
    }
  end
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 38
def configure_parser(conf)
  @parser = SendmailParser.new(conf)
end
queued(es, deliveryid, time, to_line) click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 126
def queued(es, deliveryid, time, to_line)
  from_line = @delivers[deliveryid].from_line
  record = from_line.merge(to_line)
  delay = to_line["delay_in_sec"]
  # when a queue is expired, the mail will be bounced
  if delay >= queuereturn
    record["canonical_status"] = "bounced"
    sent(es, deliveryid, time, record)
    return
  end
  record = from_line.merge(to_line)
  es.add(time, record)
end
receive_lines(lines) click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 55
def receive_lines(lines)
  es = Fluent::MultiEventStream.new
  lines.each {|line|
    begin
      line.chomp!  # remove \n
      record = parse_line(line)
      if record.nil?
        next
      end

      type   = record["type"]
      mta    = record["mta"]
      qid    = record["qid"]
      time   = record["time"]
      # a qid is not uniq worldwide.
      # make delivery id uniq even if multiple MTA"s log are mixed.
      deliveryid = mta + qid
      type   = record["type"]
      # remove unnecessary key `type"
      record.delete("type")

      case type
      when "from"
        # new log
        if @delivers.has_key?(deliveryid)
          $log.warn "duplicate sender line found. " + line.dump
        else
          @delivers[deliveryid] = SendmailLog.new(time, record)
        end
      when "to"
        if @delivers.has_key?(deliveryid)
          case record["status_canonical"]
          when "sent", "sent_local", "bounced"
            sent(es, deliveryid, time, record)
          when "deferred"
            queued(es, deliveryid, time, record)
          when "other"
            $log.warn "cannot find this kind of delivery status: " + line.dump
          end
        else
          # cannot find any 'from' line corresponded to the 'to' line
        end
      end
    rescue
      $log.warn line.dump, :error=>$!.to_s
      raise
    end
  }

  unless es.empty?
    begin
      Fluent::Engine.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
      raise
    end
  end
end
sent(es, deliveryid, time, to_line) click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 114
def sent(es, deliveryid, time, to_line)
  from_line = @delivers[deliveryid].from_line
  record = from_line.merge(to_line)
  es.add(time, record)
  nrcpts = @delivers[deliveryid].nrcpts
  @delivers[deliveryid].nrcpts -= to_line["to"].length
  # all done
  if @delivers[deliveryid].nrcpts <= 0
    @delivers.delete(deliveryid)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sendmail.rb, line 42
def shutdown
  super
  if @path_cache_file != nil
    data = {}
    @delivers.each{|k, v|
      data[k] = v.to_json
    }
    File.open(@path_cache_file, "w+") {|cache_file|
      cache_file.puts(data.to_json)
    }
  end
end