class Fluent::SendmailInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sendmail.rb, line 13 def configure(conf) super @delivers = LruRedux::ThreadSafeCache.new(@lrucache_size) if @path_cache_file != nil if not File.exists?(@path_cache_file) File.open(@path_cache_file, "w+"){|cache_file| cache_file.puts('{}') } end if not File.readable?(@path_cache_file) raise ConfigError, "cache file exists but not readable." end if not File.writable?(@path_cache_file) raise Fluent::ConfigError, "cache file not writable." end File.open(@path_cache_file, "r") {|cache_file| line = cache_file.read() data = JSON.parse(line) data.each{|k, v| @delivers[k] = SendmailLog.new(v['time'], v['from_line'], v['nrcpts']) } } end end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 38 def configure_parser(conf) @parser = SendmailParser.new(conf) end
queued(es, deliveryid, time, to_line)
click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 126 def queued(es, deliveryid, time, to_line) from_line = @delivers[deliveryid].from_line record = from_line.merge(to_line) delay = to_line["delay_in_sec"] # when a queue is expired, the mail will be bounced if delay >= queuereturn record["canonical_status"] = "bounced" sent(es, deliveryid, time, record) return end record = from_line.merge(to_line) es.add(time, record) end
receive_lines(lines)
click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 55 def receive_lines(lines) es = Fluent::MultiEventStream.new lines.each {|line| begin line.chomp! # remove \n record = parse_line(line) if record.nil? next end type = record["type"] mta = record["mta"] qid = record["qid"] time = record["time"] # a qid is not uniq worldwide. # make delivery id uniq even if multiple MTA"s log are mixed. deliveryid = mta + qid type = record["type"] # remove unnecessary key `type" record.delete("type") case type when "from" # new log if @delivers.has_key?(deliveryid) $log.warn "duplicate sender line found. " + line.dump else @delivers[deliveryid] = SendmailLog.new(time, record) end when "to" if @delivers.has_key?(deliveryid) case record["status_canonical"] when "sent", "sent_local", "bounced" sent(es, deliveryid, time, record) when "deferred" queued(es, deliveryid, time, record) when "other" $log.warn "cannot find this kind of delivery status: " + line.dump end else # cannot find any 'from' line corresponded to the 'to' line end end rescue $log.warn line.dump, :error=>$!.to_s raise end } unless es.empty? begin Fluent::Engine.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. raise end end end
sent(es, deliveryid, time, to_line)
click to toggle source
# File lib/fluent/plugin/in_sendmail.rb, line 114 def sent(es, deliveryid, time, to_line) from_line = @delivers[deliveryid].from_line record = from_line.merge(to_line) es.add(time, record) nrcpts = @delivers[deliveryid].nrcpts @delivers[deliveryid].nrcpts -= to_line["to"].length # all done if @delivers[deliveryid].nrcpts <= 0 @delivers.delete(deliveryid) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sendmail.rb, line 42 def shutdown super if @path_cache_file != nil data = {} @delivers.each{|k, v| data[k] = v.to_json } File.open(@path_cache_file, "w+") {|cache_file| cache_file.puts(data.to_json) } end end