#!/usr/bin/python3.6 -s
# GPL. (C) 2025 Paolo Patruno.

# This program is free software; you can redistribute it and/or modify 
# it under the terms of the GNU General Public License as published by 
# the Free Software Foundation; either version 2 of the License, or 
# (at your option) any later version. 
# 
# This program is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
# GNU General Public License for more details. 
# 
# You should have received a copy of the GNU General Public License 
# along with this program; if not, write to the Free Software 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
# 

import os
os.environ['DJANGO_SETTINGS_MODULE'] = 'rmap.settings'
import django
django.setup()

from rmap import daemon
import pika
import rmap.settings
from rmap import schedule
import datetime
import subprocess,os,time
import tempfile,datetime
import logging,logging.handlers
import fasteners

user=rmap.settings.amqpuser
password=rmap.settings.amqppassword
host="localhost"
exchange=rmap.settings.exchangecomposereportd
routing_key="composereportd"

composereportd = daemon.Daemon(
        stdin="/dev/null",
        stdout=rmap.settings.logfilecomposereportd,
        stderr=rmap.settings.errfilecomposereportd,
        pidfile=rmap.settings.lockfilecomposereportd,
        user=rmap.settings.usercomposereportd,
        group=rmap.settings.groupcomposereportd)

def compute(starttime,endtime,compose):

        logging.info("start/end time: %s %s" % (starttime.isoformat(' '),endtime.isoformat(' ')))
        
        for mymeta in rmap.settings.sample_measurements:
            
            variable_list= mymeta["var"]
            level        = "%s,%s,%s,%s" % tuple(("" if v is None else str(v) for v in mymeta["level"]))
            timerange    = "%s,%s,%s" % tuple(("" if v is None else str(v) for v in mymeta["trange"]))

            logging.info(variable_list)
            logging.info(level)
            logging.info(timerange)
            totalbody=b""
            
            try:
                #(fd, filename) = tempfile.mkstemp()
                filename="-"


                #print "v7d_transform"+" --input-format"+" dba"+" --output-format"+" BUFR"+\
                #    " --variable-list"+" '"+variable_list+"'"," --level "," '"+level+"'"," --timerange "+" '"+timerange+"'",\
                #    " --start-date"+" '"+starttime.isoformat(' ')+"'" +\
                #    " --end-date"+" '"+endtime.isoformat(' ')+"'"+\
                #    " --comp-start"+" '"+starttime.isoformat(' ')+"'" +\
                #    " --comp-step"+" '0000000000 00:15:00.000'"," --comp-frac-valid"," '.002'"+\
                #    " --comp-stat-proc"+" '254:0'"+" '"+rmap.settings.dsnsample_fixed+"' "+\
                #    filename

                #logging.info( "v7d_transform"+" --input-format"+" dba"+" --output-format"+" BUFR"+\
                #              " --variable-list"+" '"+variable_list+"'"," --level "," '"+level+"'"," --timerange "+" '"+timerange+"'",\
                #              " --start-date"+" '"+starttime.replace(microsecond=0).isoformat(' ')+"'" +\
                #              " --end-date"+" '"+endtime.replace(microsecond=0).isoformat(' ')+"'"+\
                #              " --comp-start"+" '"+starttime.replace(microsecond=0).isoformat(' ')+"'" +\
                #              " --comp-step"+" '0000000000 00:15:00.000'"," --comp-frac-valid"," '.002'"+\
                #              " --comp-stat-proc"+" '254:0'"+" '"+rmap.settings.dsnsample_fixed+"' "+\
                #              filename)
                
                #compute 15min mean  from instantaneous values
                #compose.procs=[subprocess.Popen(["v7d_transform","--input-format","dba","--output-format","BUFR",
                #                              "--variable-list",variable_list,"--level",level,"--timerange",timerange,
                #                              "--start-date",starttime.isoformat(' ') ,
                #                              "--end-date",endtime.isoformat(' '),
                #                              "--comp-start",starttime.isoformat(' ') ,
                #                              "--comp-step","0000000000 00:15:00.000","--comp-frac-valid",".002",
                #                              "--comp-stat-proc","254:0",
                #                              rmap.settings.dsnsample_fixed,
                #                              filename],
                #                             stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"}
                #)]

                #body,outerr=compose.procs[0].communicate()

                #status=compose.procs[0].wait()
                #if status != 0:
                #    logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))

                #else:

                #    totalbody+=body


                # work on parameter requested for 60 sec accumulation 
                if mymeta["trange"][0] == 1 and mymeta["trange"][1] == 0 and mymeta["trange"][2] == 60 :

                    logging.info("sample-> report")
                    logging.info("cumulate 60sec a cumulate 15'")
                    command=["v7d_transform","--input-format","dba","--output-format","BUFR",
                                              "--variable-list",variable_list,"--level",level,"--timerange",'1,0,60',
                                              "--start-date",starttime.isoformat(' ') ,
                                              "--end-date",endtime.isoformat(' '),
                                              "--comp-start",starttime.isoformat(' ') ,
                                              "--comp-step", '0000000000 00:15:00.000', "--comp-frac-valid", '1.', "--comp-stat-proc", '1:1',
                                              rmap.settings.dsnsample_fixed,
                                              filename]
                    logging.info(str(command).replace("', '","' '").replace("[","").replace("]",""))
                    compose.procs=[subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})]
                    body,outerr=compose.procs[0].communicate()

                    status=compose.procs[0].wait()
                    if status != 0:
                        logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))
                    else:
                        totalbody+=body


                # work on parameter requested for 60 sec mean 
                if mymeta["trange"][0] != 0 or mymeta["trange"][1] != 0 or mymeta["trange"][2] != 60 : continue


                # I DO NOT use AMQP here, direct copy !
                logging.info("sample-> sample")
                logging.info("istantanee a medie 60sec")

                command=["v7d_transform","--input-format","dba","--output-format","dba",
                                              "--variable-list",variable_list,"--level",level,"--timerange",'254,0,0',
                                              "--start-date",starttime.isoformat(' ') ,
                                              "--end-date",endtime.isoformat(' '),
                                              "--comp-start",starttime.isoformat(' ') ,
                                              "--comp-step", '0000000000 00:01:00.000', "--comp-frac-valid", '.0002', "--comp-stat-proc", '254:0',
                                              rmap.settings.dsnsample_fixed,
                                              rmap.settings.dsnsample_fixed]
                logging.info(str(command).replace("', '","' '").replace("[","").replace("]",""))
                compose.procs=[subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})]

                rw_lock = fasteners.InterProcessReaderWriterLock(rmap.settings.lockfilesample_fixed)
                with rw_lock.write_lock():
                        body,outerr=compose.procs[0].communicate()
                        status=compose.procs[0].wait()
                        
                if status != 0:
                    logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))


                logging.info("sample-> report")
                logging.info("medie 60sec a media 15'")
                command=["v7d_transform","--input-format","dba","--output-format","BUFR",
                                              "--variable-list",variable_list,"--level",level,"--timerange",'0,0,60',
                                              "--start-date",starttime.isoformat(' ') ,
                                              "--end-date",endtime.isoformat(' '),
                                              "--comp-start",starttime.isoformat(' ') ,
                                              "--comp-step", '0000000000 00:15:00.000', "--comp-frac-valid", '.9', "--comp-stat-proc", '0:0',
                                              rmap.settings.dsnsample_fixed,
                                              filename]
                logging.info(str(command).replace("', '","' '").replace("[","").replace("]",""))
                compose.procs=[subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})]
                body,outerr=compose.procs[0].communicate()

                status=compose.procs[0].wait()
                if status != 0:
                    logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))

                else:

                    totalbody+=body


                logging.info("sample-> report")
                logging.info("medie 60sec a massima 15'")
                command=["v7d_transform","--input-format","dba","--output-format","BUFR",
                                              "--variable-list",variable_list,"--level",level,"--timerange",'0,0,60',
                                              "--start-date",starttime.isoformat(' ') ,
                                              "--end-date",endtime.isoformat(' '),
                                              "--comp-start",starttime.isoformat(' ') ,
                                              "--comp-step", '0000000000 00:15:00.000', "--comp-frac-valid", '.9', "--comp-stat-proc", '0:2',
                                              rmap.settings.dsnsample_fixed,
                                              filename]
                logging.info(str(command).replace("', '","' '").replace("[","").replace("]",""))
                compose.procs=[subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})]
                body,outerr=compose.procs[0].communicate()

                status=compose.procs[0].wait()
                if status != 0:
                    logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))

                else:

                    totalbody+=body

                logging.info("sample-> report")
                logging.info("medie 60sec a minima 15'")
                command=["v7d_transform","--input-format","dba","--output-format","BUFR",
                                              "--variable-list",variable_list,"--level",level,"--timerange",'0,0,60',
                                              "--start-date",starttime.isoformat(' ') ,
                                              "--end-date",endtime.isoformat(' '),
                                              "--comp-start",starttime.isoformat(' ') ,
                                              "--comp-step", '0000000000 00:15:00.000', "--comp-frac-valid", '.9', "--comp-stat-proc", '0:3',
                                              rmap.settings.dsnsample_fixed,
                                              filename]
                logging.info( str(command).replace("', '","' '").replace("[","").replace("]",""))
                compose.procs=[subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})]
                body,outerr=compose.procs[0].communicate()

                status=compose.procs[0].wait()
                if status != 0:
                    logging.error("There were some errors executing v7d_transform: %s %s " % (status,outerr))

                else:

                    totalbody+=body


                logging.info("sample-> report")
                logging.info("medie 60sec a istantanee a step 15' senza calcoli con selezione del valore piu' prossimo nel tempo nell'intervallo di 30sec")

                command1=["v7d_transform","--input-format","dba","--output-format","BUFR",
                          "--variable-list",variable_list,"--level",level,"--timerange",'0,0,60',
                          "--start-date",starttime.isoformat(' ') ,
                          "--end-date",endtime.isoformat(' '),
                          "--comp-start",starttime.isoformat(' ') ,
                          "--comp-step", '0000000000 00:01:00.000', "--comp-frac-valid", '.9', "--comp-stat-proc", '0:254',
                          rmap.settings.dsnsample_fixed,
                          filename]

                command2=["v7d_transform","--input-format","BUFR","--output-format","BUFR",
                          "--comp-start",starttime.isoformat(' ') ,
                          "--comp-step", '0000000000 00:15:00.000','--comp-fill-data', '--comp-fill-tolerance', '0000000000 00:00:60.000', '--comp-filter-time',  
                          filename,
                          filename]

                logging.info(str(command1).replace("', '","' '").replace("[","").replace("]",""))
                logging.info(str(command2).replace("', '","' '").replace("[","").replace("]",""))
                p1=subprocess.Popen(command1,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})
                p2=subprocess.Popen(command2,stdin=p1.stdout,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})
                p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
                compose.procs=[p1,p2]
                body,outerr=compose.procs[1].communicate()

                status=compose.procs[0].wait()
                if status != 0:
                    logging.error("There were some errors executing v7d_transform 1 : %s %s " % (status,outerr))
                else:
                    status=compose.procs[1].wait()
                    if status != 0:
                        logging.error("There were some errors executing v7d_transform 2: %s %s " % (status,outerr))
                    else:
                        totalbody+=body


                # I DO NOT use AMQP here, direct copy !
                logging.info("sample-mobile -> report-mobile")
                logging.info("copia senza calcoli")

                command1=["dbadb","export","--dsn",rmap.settings.dsnsample_mobile,"varlist",variable_list,
                          "yearmin",str(starttime.year),"monthmin",str(starttime.month),"daymin",str(starttime.day),"hourmin",str(starttime.hour),"minumin",str(starttime.minute),"secmin",str(starttime.second),
                          "yearmax",str(endtime.year),"monthmax",str(endtime.month),"daymax",str(endtime.day),"hourmax",str(endtime.hour),"minumax",str(endtime.minute),"secmax",str(endtime.second)]
                command2=["dbadb","import","--dsn",rmap.settings.dsnreport_mobile]

                logging.info(str(command1).replace("', '","' '").replace("[","").replace("]",""))
                logging.info(str(command2).replace("', '","' '").replace("[","").replace("]",""))
                p1=subprocess.Popen(command1,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})
                p2=subprocess.Popen(command2,stdin=p1.stdout,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env={"LOG4C_PRIORITY":"info"})
                p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
                compose.procs=[p1,p2]
                
                rw_lock = fasteners.InterProcessReaderWriterLock(rmap.settings.lockfilereport_mobile)
                with rw_lock.write_lock():
                        body,outerr=compose.procs[1].communicate()
                        status=compose.procs[0].wait()
                if status != 0:
                    logging.error("There were some errors executing dbadb 1 : %s %s " % (status,outerr))
                else:
                    status=compose.procs[1].wait()
                    if status != 0:
                        logging.error("There were some errors executing dbadb 2: %s %s " % (status,outerr))
                        

                #sample-> report
                # deviazione standard su un minuto dalle istantanee DA AGGIUSTARE!
                #                             v7d_transform --input-format BUFR --output-format BUFR --variable-list 'B12101' \
	        #                             --level '103,2000,,' --timerange '254,0,0' \
	        #                             --start-date '2017-05-07 00:00:00' --end-date '2017-05-08 12:00:00' \
	        #                             --comp-start '2017-05-07 00:00:00' --comp-step '0000000000 00:01:00.000' \
	        #                             --comp-frac-valid '.002' --comp-stat-proc '254:6' tutto.bufr stddev.bufr


                if totalbody != b"":

                    try:
                        # Legge un file.
                        #in_file = open(filename,"r")
                        #in_file = os.fdopen(fd, "r")
                        #body = in_file.read()
                        #in_file.close()
                        
                        credentials=pika.PlainCredentials(user, password)
                        properties=pika.BasicProperties(
                            user_id= user,
                            delivery_mode = 2, # persistent
                        )

                        # connection_attempts (int) - Maximum number of retry attempts
                        # retry_delay (int|float) - Time to wait in seconds, before the next
                        # socket_timeout (int|float) - Use for high latency networks
                        
                        connection = pika.BlockingConnection(pika.ConnectionParameters(
                            host=host,credentials=credentials,
                            connection_attempts=3,
                            retry_delay=5,
                            socket_timeout=3.))

                        channel = connection.channel()

                        #channel.queue_declare(queue=queue)
                        
                        # Turn on delivery confirmations
                        channel.confirm_delivery()

                        channel.basic_publish(exchange=exchange,
                                                 routing_key=routing_key,
                                                 body=totalbody,
                                                 properties=properties)
                        logging.info(" [x] message Sent ")
                        connection.close()

                    except:
                        raise

                    #finally:
                        #os.remove(filename)

            except:
                logging.error("There were some errors executing dba_transform")
                raise

def job(delta,offset,compose):
    now=datetime.datetime.now()
    logging.info(f"Start work delta={delta} offset={offset}")
    #now=datetime.datetime.now(datetime.UTC)
    now=datetime.datetime.utcnow()
    newminute = now.minute - (now.minute % (delta//60))
    endtime=(now.replace(minute=newminute,second=0,microsecond=0))

    if (delta >= 60*60):
        newhour = now.hour - (now.hour % (delta//(60*60)))
        endtime=(endtime.replace(hour=newhour,minute=0,second=0,microsecond=0))

    endtime-=datetime.timedelta(seconds=offset)    
    starttime=endtime-datetime.timedelta(seconds=delta)
    logging.info(f"start time {starttime}; end time {endtime}")
    compute(starttime,endtime,compose)
    logging.info(f"Work terminated")

def main(compose):

    # configure the logger
    formatter=logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
    handler = logging.handlers.RotatingFileHandler(compose.options.stdout, maxBytes=5000000, backupCount=10)
    handler.setFormatter(formatter)
    
    # Add the log message handler to the root logger
    logging.getLogger().addHandler(handler)
    logging.getLogger().setLevel(logging.INFO)
        
    logging.info('Starting up composereportd')
    
    #    my_env = os.environ
    #    my_env["PYTHONPATH"] = "/usr/local/lib/python2.7/site-packages" + my_env.get("PYTHONPATH","")

    # ultimi 15 minuti ogni 15 minuti
    schedule.every().hour.at("16:00").do(job, delta=60*15, offset=0, compose=compose)
    schedule.every().hour.at("31:00").do(job, delta=60*15, offset=0, compose=compose)
    schedule.every().hour.at("46:00").do(job, delta=60*15, offset=0, compose=compose)
    # ultimi 60 minuti ogni ora
    schedule.every().hour.at("01:00").do(job, delta=60*60, offset=0, compose=compose)

    # da 6 a una ora fa ogni 6 ore
    schedule.every().day.at("00:05","UTC").do(job, delta=60*60*5, offset=60*60, compose=compose)
    schedule.every().day.at("06:05","UTC").do(job, delta=60*60*5, offset=60*60, compose=compose)
    schedule.every().day.at("12:05","UTC").do(job, delta=60*60*5, offset=60*60, compose=compose)
    schedule.every().day.at("08:05","UTC").do(job, delta=60*60*5, offset=60*60, compose=compose)

    # 48 ore a partire dall'ultimo giorno in archivio
    schedule.every().day.at("01:05","UTC").do(job, delta=60*60*24*2, offset=60*60*24*6, compose=compose)
    # 24 ore a partire dal penultimo giorno in archivio e così via
    schedule.every().day.at("02:05","UTC").do(job, delta=60*60*24, offset=60*60*24*5, compose=compose)
    schedule.every().day.at("03:05","UTC").do(job, delta=60*60*24, offset=60*60*24*4, compose=compose)
    schedule.every().day.at("04:05","UTC").do(job, delta=60*60*24, offset=60*60*24*3, compose=compose)
    schedule.every().day.at("05:05","UTC").do(job, delta=60*60*24, offset=60*60*24*2, compose=compose)
    schedule.every().day.at("07:05","UTC").do(job, delta=60*60*24, offset=60*60*24,   compose=compose)

    while True:
        n = schedule.idle_seconds()
        if n is None:
            # no more jobs
            break
        elif n > 0:
            # sleep exactly the right amount of time
            time.sleep(n)
        elif n < 0:
            logging.error("I AM LATE! somethings will be lost")
        
        schedule.run_pending()


if __name__ == '__main__':

    import sys, os
    composereportd.cwd=os.getcwd()
    
    if composereportd.service():

        sys.stdout.write("Daemon started with pid %d\n" % os.getpid())
        sys.stdout.write("Daemon stdout output\n")
        sys.stderr.write("Daemon stderr output\n")

        main(composereportd)  # (this code was run as script)

        for proc in composereportd.procs:
            proc.wait()

        sys.exit(0)
