#!/usr/bin/python3.6 -s
# GPL. (C) 2023 Paolo Patruno.

# This program is free software; you can redistribute it and/or modify 
# it under the terms of the GNU General Public License as published by 
# the Free Software Foundation; either version 2 of the License, or 
# (at your option) any later version. 
# 
# This program is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
# GNU General Public License for more details. 
# 
# You should have received a copy of the GNU General Public License 
# along with this program; if not, write to the Free Software 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
# 

TIMESTEP = 15              # execute dbadb import everi TIMESTEP seconds
PACKET_MAX_SIZE = 1000000  # max size in bytes for one dballe import
MESSAGES_MAX_NUMBER = 500  # max number of bufr for transaction (with posgresql 100/1000 is the better value by benchmark)

import os,io,signal
import logging,logging.handlers

os.environ['DJANGO_SETTINGS_MODULE'] = 'rmap.settings'
import django
django.setup()

from rmap import daemon
import pika
import traceback
import rmap.settings
import threading,queue,time
import dballe
from rmap.rmap_core import amqpConsumerProducer
from rmap.rmap_core import Message
import fasteners


#add option for topic and dsn
class  mydaemon(daemon.Daemon):

    def optionparser(self):
        op = super(mydaemon, self).optionparser()
        op.add_option("-d", "--datalevel",dest="datalevel", help="sample or report: define the istance to run: select topic, dns,logfile, errorfile and lockfile (default %default)", default=None)
        op.add_option("-s", "--stationtype",dest="stationtype", help="fixed or mobile: define the istance to run: select topic, dns,logfile, errorfile and lockfile (default %default)", default=None)
        #op.add_option("-t", "--topic",dest="topic", help="topic root to subscribe on mqtt broker (default %default)", default="rmap")
        #op.add_option("-d", "--dsn",dest="dsn", help="topic root to subscribe on mqtt broker (default %default)", default=rmap.settings.dsnrmap)
        return op 	  				 


amqp2dballed = mydaemon(
        stdin="/dev/null",
        stdout=rmap.settings.logfileamqp2dballed,
        stderr=rmap.settings.errfileamqp2dballed,
        pidfile=rmap.settings.lockfileamqp2dballed,
        user=rmap.settings.useramqp2dballed,
        group=rmap.settings.groupamqp2dballed
)


# catch signal to terminate the process
class GracefulKiller:
    kill_now = False
    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)

    def exit_gracefully(self,signum, frame):
        self.kill_now = True

    def keyboard_interrupt(self):
        self.kill_now = True

    def terminate(self):
        self.kill_now = True


class Threaded_bufr2dballe(threading.Thread):

    def __init__(self,dsn,lockfile,receive_queue,send_queue):
        threading.Thread.__init__(self)
        self._logging=logging.getLogger()
        self._dsn=dsn
        self._receive_queue=receive_queue
        self._send_queue=send_queue
        self._terminate_event = threading.Event()
        self._lockfile=lockfile
        
    def terminate(self):
        self._logging.info('Threaded_bufr2dballe Terminated')
        self._terminate_event.set()

    def have_to_terminate(self):
        return self._terminate_event.is_set()
   
    def run(self):

        self._logging.info('Threaded_bufr2dballe Started')
        timestart=time.time()
        saved_message=None
        rw_lock = fasteners.InterProcessReaderWriterLock(self._lockfile)  # for processes
        while True:

            if(self.have_to_terminate()):
                self._logging.info('Threaded_bufr2dballe exit')
                return

            # buffer messages for TIMESTEP sec
            now=time.time()
            timetosleep=TIMESTEP-(now-timestart)
            if timetosleep >0:
                self._logging.debug("sleep for: %s" % timetosleep)
                time.sleep(timetosleep)
            timestart=now
            
            messages=[]
            size=0
            go=True
            while (not saved_message is None or not self._send_queue.empty()) and go:
                if saved_message is None:
                    message=self._send_queue.get()
                else:
                    message=saved_message
                    saved_message=None
                    
                #if len(message.body) > PACKET_MAX_SIZE:
                #    self._logging.error("Message too big, skip it; tag %d, size %d" % (message.delivery_tag,len(message.body)))
                #    self._receive_queue.put_nowait(Message(False,message.delivery_tag))
                #else:
                size += len(message.body)
                if size <= PACKET_MAX_SIZE or len(messages)==0:
                    messages.append(message)
                else:
                    self._logging.warning("max size for packet, last message of size %s bytes will be send the next time" % len(message.body))
                    # save message for late
                    #self._send_queue.put(message)
                    saved_message=message
                    go=False
                    
            if len(messages) > 0:
                self._logging.info("elaborate %s messages" % len(messages))
                totalbody=b""
                totaltag=[]
                for message in messages:
                    totalbody += message.body
                    totaltag.append(message.delivery_tag)
                try:

                    totalbodyfile = io.BytesIO(totalbody) 

                    self._logging.debug("acquire lock on DB for write")
                    with rw_lock.write_lock():
                        self._logging.debug("lock acquired")
                        # Connect to the database
                        db = dballe.DB.connect(self._dsn)
                        importer = dballe.Importer("BUFR")
                        count=0
                        messages=[]
                        with importer.from_file(totalbodyfile) as f:
                            try:
                                for dballemessage in f:
                                    count+=1
                                    messages.append(dballemessage)
                                    if count >= MESSAGES_MAX_NUMBER:
                                        # Start a transaction for one segment of MESSAGES_MAX_NUMBER messages
                                        self._logging.info("start transaction")
                                        with db.transaction() as tr:
                                            for message in messages:
                                                try:
                                                    self._logging.debug("start dballe import")
                                                    tr.import_messages(message,overwrite=True, update_station=True,import_attributes=True)
                                                except KeyError:
                                                    self._logging.error("Message was rejected importing in DB: not all metadata defined! ")
                                        count=0
                                        messages=[]
                            except ValueError as exception:
                                self._logging.error("Error in bufr decoding")
                                self._logging.error('Exception occured: ' + str(exception))
                                self._logging.error(traceback.format_exc())


                                    
                        if count >0:                      # send last not completed segment
                            # Start a transaction
                            with db.transaction() as tr:
                                for message in messages:
                                    try:
                                        tr.import_messages(message,overwrite=True, update_station=True,import_attributes=True)
                                    except KeyError:
                                        self._logging.error("Message was rejected importing in DB: not all metadata defined! ")
                        
                    status=True
                            
                except Exception as exception:
                    self._logging.error("There were some errors import message in dballe")
                    self._logging.debug("skip message: ")
                    self._logging.debug(totalbody)
                    self._logging.debug("related to tag:")
                    for tag in totaltag:
                        self._logging.debug("tag: %d",tag)
                    self._logging.debug("---------------------")
                    self._logging.error('Exception occured: ' + str(exception))
                    self._logging.error(traceback.format_exc())
                    status=False
                        
                self._logging.info("Packet done: importing in dballe terminated")
                self._send_queue.task_done()
                self._logging.debug("enqueue confirm tag:")
                for tag in totaltag:
                    self._logging.debug("tag: %s %d",status,tag)
                    self._receive_queue.put_nowait(Message(status,tag))


def main(self):

    #arm the signal handler
    killer = GracefulKiller()
    
    # configure the logger
    formatter=logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
    handler = logging.handlers.RotatingFileHandler(self.options.stdout, maxBytes=5000000, backupCount=10)
    handler.setFormatter(formatter)
    
    # Add the log message handler to the root logger
    logging.getLogger().addHandler(handler)
    logging.getLogger().setLevel(logging.INFO)

    logging.info('Starting up amqp2dballed')
    
    dsndict={"sample":{},"report":{}}
    dsndict["sample"]["fixed"]=rmap.settings.dsnsample_fixed
    dsndict["sample"]["mobile"]=rmap.settings.dsnsample_mobile
    dsndict["report"]["fixed"]=rmap.settings.dsnreport_fixed
    dsndict["report"]["mobile"]=rmap.settings.dsnreport_mobile

    lockfiledict={"sample":{},"report":{}}
    lockfiledict["sample"]["fixed"]=rmap.settings.lockfilesample_fixed
    lockfiledict["sample"]["mobile"]=rmap.settings.lockfilesample_mobile
    lockfiledict["report"]["fixed"]=rmap.settings.lockfilereport_fixed
    lockfiledict["report"]["mobile"]=rmap.settings.lockfilereport_mobile
    
    if (not self.options.datalevel is None):
        if not (self.options.datalevel in list(dsndict.keys())):
            logging.error('Invalid dsn')
            sys.stdout.write("Invalid dsn\n")
            return False

    if (not self.options.stationtype is None):
        if not (self.options.stationtype in list(dsndict[self.options.datalevel].keys())):
            logging.error('Invalid dsn')
            sys.stdout.write("Invalid dsn\n")
            return False

    if (self.options.stationtype is None != self.options.datalevel is None ):
            logging.error('Invalid: only one of datalevel and stationtype options are missed ')
            sys.stdout.write("Invalid: only one of datalevel and stationtype options are missed\n")
            return False

    if (self.options.stationtype is None and self.options.datalevel is None ):
        inqueue="rmap.dae.bufr.report_fixed_validated"
        dsn=rmap.settings.dsn
        lockfile=rmap.settings.lockfile
    else:        
        inqueue="rmap..bufr."+self.options.datalevel+"_"+self.options.stationtype
        dsn   = dsndict[self.options.datalevel][self.options.stationtype]
        lockfile=lockfiledict[self.options.datalevel][self.options.stationtype]
        
    logging.info('Queue: %s' % inqueue)
    logging.info('DSN: %s'% dsn)
    logging.info('Lockfile: %s'% lockfile)
    logging.info("Start version: "+rmap.__version__)

    print(' [*] Waiting for messages. To exit press CTRL+C')


    user=rmap.settings.amqpuser
    password=rmap.settings.amqppassword
    host=rmap.settings.amqphost
        
    send_queue = queue.Queue()
    receive_queue = queue.Queue()

    bufr2dballe=Threaded_bufr2dballe(dsn,lockfile,receive_queue,send_queue)
    amqp=amqpConsumerProducer(host=host,queue=inqueue,user=user,password=password,
                              receive_queue=receive_queue,send_queue=send_queue)

    bufr2dballe.start()
    amqp.start()

    # infinite loop
    while True:
        try:
            time.sleep(3) # do nothing
        except Exception as exception:
            logging.error('Exception occured: ' + str(exception))
            logging.error(traceback.format_exc())
            killer.terminate()
            
        # terminate on keyboard interrupt
        except KeyboardInterrupt:
            sys.stdout.write("keyboard interrupt\n")
            logging.info("keyboard interrupt\n")
            killer.keyboard_interrupt()

        if (not amqp.is_alive() or not bufr2dballe.is_alive()):
            logging.info("amqp aborted\n")
            killer.terminate()            
            
        # check if we have to terminate together with other exceptions
        if killer.kill_now:
            logging.info("killed by signal\n")

            amqp.terminate()
            bufr2dballe.terminate()
            amqp.join()
            bufr2dballe.join()
            logging.debug("Terminated")

            return False

if __name__ == '__main__':

    import sys, os
    amqp2dballed.cwd=os.getcwd()

    if amqp2dballed.service():

        sys.stdout.write("Daemon started with pid %d\n" % os.getpid())
        sys.stdout.write("Daemon stdout output\n")
        sys.stderr.write("Daemon stderr output\n")

        status=main(amqp2dballed)  # (this code was run as script)

        for proc in amqp2dballed.procs:
            proc.wait()

        sys.exit(status)
