class QueueRunner
Constants
- RetryInterval
Public Class Methods
new()
click to toggle source
# File lib/rubymta/queue_runner.rb, line 21 def initialize end
Public Instance Methods
deliver_and_save_status(mail,domain,parcels)
click to toggle source
# File lib/rubymta/queue_runner.rb, line 107 def deliver_and_save_status(mail,domain,parcels) # the methods lmtp_delivery and smtp_delivery will change # the status upon successful delivery # get the certificates, if any; they're needed for STARTTLS $prv = if PrivateKey then OpenSSL::PKey::RSA.new File.read(PrivateKey) else nil end $crt = if Certificate then OpenSSL::X509::Certificate.new File.read(Certificate) else nil end # establish an SSL context $ctx = OpenSSL::SSL::SSLContext.new $ctx.key = $prv $ctx.cert = $crt begin case parcels.first[:delivery] #================================================== when "local" begin ssl_socket = TCPSocket.open('localhost',LocalLMTPPort) @connection = OpenSSL::SSL::SSLSocket.new(ssl_socket, $ctx); if defined?(SpamAssassinInstalled) && SpamAssassinInstalled text = mail[:data][:text].join(CRLF) text, e, s = Open3.capture3("spamc", :stdin_data=>text) mail[:data][:text] = text.split(CRLF) end lmtp_delivery('localhost',LocalLMTPPort,mail,domain,parcels) mail.update_parcels(parcels) rescue Errno::ECONNREFUSED => e LOG.info(@mail_id) {"Connection to localhost failed: #{e}"} mark_parcels(parcels, "441 4.0.0 Connection to localhost failed") end #================================================== when "remote" # this looks through the list of MXs and finds the # first one that can communicate mail[:rcptto].each do |rcptto| if rcptto[:domain]==domain rcptto[:mxs].each do |preference,pairs| pairs.each do |mx,ip| begin # open the connection ssl_socket = TCPSocket.open(mx,RemoteSMTPPort) @connection = OpenSSL::SSL::SSLSocket.new(ssl_socket, $ctx); smtp_delivery(mx, RemoteSMTPPort, mail, domain, parcels) mail.update_parcels(parcels) return rescue Errno::ETIMEDOUT => e LOG.info(@mail_id) {"Service for #{mx} not available (timeout)"} rescue Errno::ECONNREFUSED => e LOG.info(@mail_id) {"Service for #{mx} not available (refused)"} end end # delivery was remote, and no MX was connectable mark_parcels(parcels, "441 4.0.0 No MX for <#{domain}> has an operational mail server") end end end #================================================== when 'none' # just ignore the email--it will not be delivered else # we didn't program a delivery option, so it got here mark_parcels(parcels, "500 5.0.0 Delivery option '#{parcel[:delivery]} not supported") end rescue => e LOG.info(@mail_id) {"Rescue #{e.inspect}"} e.backtrace.each { |line| LOG.fatal(mail[:mail_id]) { line } } mark_parcels(parcels, "441 4.0.0 Rescue #{e.inspect}") end ensure if @connection send_text("QUIT") ok, lines = recv_text # ignore returns @connection.close end end
lmtp_delivery(host, port, mail, domain, parcels)
click to toggle source
domain (for local) is 'localhost', and parcels is an array of hashes (recipients) – the single key is the domain to which to send the email – since all recipients are in the same domain, we can send only one email with all recipients named in RCPT TOs – this delivery is for Dovecot, so we believe the domain is 'ServerName'
# File lib/rubymta/queue_runner.rb, line 340 def lmtp_delivery(host, port, mail, domain, parcels) LOG.info(@mail_id) {"Beginning delivery of #{mail[:mail_id]} to Dovecot at #{ServerName}"} # receive the server's welcome message ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # send the LHLO send_text("LHLO #{mail[:local_hostname]}") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # MAIL FROM send_text("MAIL FROM:<#{mail[:mailfrom][:url]}>") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # RCPT TO parcels.each do |parcel| send_text("RCPT TO:<#{parcel[:to_url]}>") ok, lines = recv_text # if there's a problem, we mark this pacel (recipient), but keep processing others mark_parcels([parcel], lines) if ok!='2' end # DATA -- send the email send_text("DATA") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='3' LOG.info(@mail_id) {"<- (data)"} if LogQueueRunnerConversation mail[:data][:text].each do |line| send_text(line, :data) end # send the end of the message prompt send_text(".", :data) # get one final message for each recipient parcels.each do |parcel| # get the response from DoveCot ok, lines = recv_text ret = mark_parcels([parcel], lines) if ok=='2' LOG.info(@mail_id) {"Mail for #{parcel[:to_url]} delivered locally"} else LOG.info(@mail_id) {"Mail for #{parcel[:to_url]} failed delivery locally, #{lines.last}"} end return ret end end
mark_parcels(parcels, responses)
click to toggle source
# File lib/rubymta/queue_runner.rb, line 184 def mark_parcels(parcels, responses) response = if responses.kind_of?(Array) then responses.last else responses end parcels.each do |parcel| parcel[:delivery_at] = if response[0]!='4' then Time.now else nil end parcel[:delivery_msg] = response parcel[:retry_at] = if response[0]=='4' then Time.now + RetryInterval else nil end end nil end
recv_text()
click to toggle source
receive text from the client
# File lib/rubymta/queue_runner.rb, line 39 def recv_text begin lines = [] Timeout.timeout(QueueRunnerTimeout) do tmp = @connection.gets lines << (line = if tmp.nil? then "" else tmp.chomp end) LOG.info((@mail_id)) {" -> %s"%line} if LogQueueRunnerConversation while line[3]=='-' tmp = @connection.gets lines << (line = if tmp.nil? then "" else tmp.chomp end) LOG.info((@mail_id)) {" -> %s"%line} if LogQueueRunnerConversation end ok = lines.last[0] lines.each {|line| puts " -> #{line.inspect}"} if DisplayQueueRunnerDialog return ok, lines end rescue Timeout::Error => e return '5', "500 5.0.0 No data received after #{QueueRunnerTimeout} seconds (Time Out)" end end
run_queue()
click to toggle source
# File lib/rubymta/queue_runner.rb, line 60 def run_queue @mail_id = nil LOG.info(Time.now.strftime("%Y-%m-%d %H:%M:%S")) {"Queue runner started"} n=3 # used for a sanity check while true # sqlite3 has a bug: "<=" doesn't work with time, ex. "retry_at<='#{Time.now}'" # we have to add 1 second and use "<"; ex. "retry_at<'#{Time.now+1}'" parcels = S3DB[:parcels].where(Sequel.lit("(delivery<>'none') and (delivery_at is null) and ((retry_at is null) or (retry_at<'#{Time.now + 1}'))")).all return if parcels.empty? # aggregate the emails by destination domain deliver = {} parcels.each do |parcel| if parcel[:delivery]!='none' && !parcel[:delivery_at] mail_id = deliver[parcel[:mail_id]] ||= {} domain = mail_id[parcel[:to_url].split('@')[1]] ||= {} domain[parcel[:to_url]] = parcel end end # send mail to each domain--success or failure will be # handled in the respective mail routine mail = {} deliver.each do |mail_id, domains| (mail = ItemOfMail::retrieve_mail_from_queue_folder(mail_id)) if mail && mail[:mail_id]!=mail_id if mail.nil? LOG.info(Time.now.strftime("%Y-%m-%d %H:%M:%S")) {"Skipping mail #{mail_id} because there's no associated queue file"} next end domains.each do |domain, parcels| @mail_id = mail[:mail_id] deliver_and_save_status(mail, domain, parcels.values) @mail_id = nil end end if (n-=1)<0 LOG.info(Time.now.strftime("%Y-%m-%d %H:%M:%S")) {"In QueueRunner, the loop repeated 3 times. Is somthing wrong?"} return nil end end ensure LOG.info(Time.now.strftime("%Y-%m-%d %H:%M:%S")) {"Queue runner finished"} end
send_local_email(from, to, subject, text)
click to toggle source
to send an alert email to a registered user – this just delivers the message to dovecot with no processing – the caller is responsible to provide valid arguments
# File lib/rubymta/queue_runner.rb, line 395 def send_local_email(from, to, subject, text) @connection = nil # open connection ssl_socket = TCPSocket.open('localhost',LocalLMTPPort) @connection = OpenSSL::SSL::SSLSocket.new(ssl_socket); # receive the server's welcome message ok, lines = recv_text return ok, lines if ok!='2' # send the LHLO send_text("LHLO admin") ok, lines = recv_text return ok, lines if ok!='2' # MAIL FROM send_text("MAIL FROM:<#{from}>") ok, lines = recv_text return ok, lines if ok!='2' # RCPT TO send_text("RCPT TO:<#{to}>") ok, lines = recv_text return ok, lines if ok!='2' # DATA -- send the email send_text("DATA") ok, lines = recv_text return ok, lines if ok!='3' lines = <<ALERT To: <#{to}> From: <#{from}> Subject: #{subject} Date: #{Time.now.strftime("%a, %d %b %Y %H:%M:%S %z")} #{text} ALERT lines.split("\n").each do |line| send_text(line, :data) end # send the end of the message prompt send_text(".", :data) # get the response from DoveCot return recv_text ensure @connection.close if @connection end
send_text(text,echo=:command)
click to toggle source
send text to the client
# File lib/rubymta/queue_runner.rb, line 25 def send_text(text,echo=:command) puts "<- #{text.inspect}" if DisplayQueueRunnerDialog if text.class==Array text.each do |line| @connection.write(line+CRLF) LOG.info(@mail_id) {"<- %s"%text} if LogQueueRunnerConversation && echo==:command end else @connection.write(text+CRLF) LOG.info(@mail_id) {"<- %s"%text} if LogQueueRunnerConversation && echo==:command end end
smtp_delivery(host, port, mail, domain, parcels, username=nil, password=nil)
click to toggle source
SAMPLE SMTP TRANSFER <- 220 2.0.0 mail.xyz.com ESMTP Xyz, LLC 0.01 THU, 24 NOV 2016 20:22:39 +0000
-> EHLO foo.com
<- 250-2.0.0 mail.xyz.com Hello foo.com at 213.33.76.136 <- 250-AUTH PLAIN <- 250-STARTTLS <- 250 HELP
# File lib/rubymta/queue_runner.rb, line 231 def smtp_delivery(host, port, mail, domain, parcels, username=nil, password=nil) LOG.info(@mail_id) {"Beginning delivery of #{mail[:id]} to remote server at #{host}"} # receive the server's welcome message ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # send the EHLO send_text("EHLO #{mail[:local_hostname]}") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # check for STARTTLS supported by server if !lines.select{ |line| line.index("STARTTLS") }.empty? send_text("STARTTLS") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # enable TLS @connection.connect LOG.info(@mail_id) {"<-> (handshake)"} send_text("EHLO #{mail[:local_hostname]}") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' end # AUTH PLAIN -- log onto server if username user_pass = Base64::encode64("\0#{username}\0#{password}").chomp send_text("AUTH PLAIN #{user_pass}") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' end # MAIL FROM send_text("MAIL FROM:<#{mail[:mailfrom][:url]}>") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='2' # RCPT TO parcels.each do |parcel| send_text("RCPT TO:<#{parcel[:to_url]}>") ok, lines = recv_text # if there's a problem, we mark this parcel (recipient), but keep processing others mark_parcels(parcels, lines) if ok!='2' end # DATA -- send the email send_text("DATA") ok, lines = recv_text return mark_parcels(parcels, lines) if ok!='3' LOG.info(@mail_id) {"<- (data)"} if !LogQueueRunnerConversation mail[:data][:text].each do |line| send_text(line, :data) end # send the end of the message prompt send_text(".", :data) # get one final message for all parcels (recipients) ok, lines = recv_text ret = mark_parcels(parcels, lines) if ok=='2' LOG.info(@mail_id) {"Mail for #{parcels[0][:to_url]}, et.al. delivered remotely"} else LOG.info(@mail_id) {"Mail for #{parcels[0][:to_url]}, et.al. failed delivery remotely, #{lines.last}"} end return ret end