Skip to content
mar 28 / David Regnier

Création d’un rapport XML agrégé avec Python

Voici un module complet Python qui produit depuis une source un rapport XML (de type agrégé), ce module est mis à disposition pour pouvoir être adapté à vos besoins

Prérequis:

  • Python 2.5
  • La librairie Python config: Librairie Python config
  • La librairie xml.dom.minidom, documentation: xml.dom.minidom
  • La librairie ToolsBox et LogStats est disponible dans le fichier « .zip » sur cette page

Voici le script principal, pour obtenir le module complet merci de télécharger le fichier « .zip » présent à la fin de cet article.

#    Mode: Python tab-width: 4
#    Id: make_acdr_eu_nq_report.py
#    Author: David REGNIER
#    Description: Produce XML file for XXXX report
#
# ======================================================================
# Copyright 2014 by David REGNIER
#
#                         All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name David
# REGNIER not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
# ======================================================================

# Load Python lib
import os, sys, csv, traceback, textwrap
import xml.dom.minidom # Lib for XML

# Lib for config variables
try:
    from lib.config.configobj import ConfigObj
except ImportError:
    ConfigObj = None
    sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
    sys.exit();

# Lib for helpers
try:
    from lib.helper.helper_misc import ToolsBox, LogStats
except ImportError:
    ToolsBox = None
    LogStats = None
    sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
    sys.exit(); 

# Init configuration
oConfig = ConfigObj('config/config.cfg') # Set config variables

# First, init log files
ToolsBox.init_error_log() # Error
sys.stdout = LogStats() # Stats log

# Init folder tree for this module
ToolsBox.init_folder_tree(
    '',
    oConfig['file_section']['hors_quota_folder_name_input'],
    oConfig['file_section']['hors_quota_folder_name_output'],
    oConfig['file_section']['hors_quota_folder_name_log'],
    oConfig['file_section']['hors_quota_folder_name_doc']
)
# Init months for input
ToolsBox.init_folder_tree(oConfig['file_section']['hors_quota_folder_name_input'], ToolsBox.get_months())
# Init months for output
ToolsBox.init_folder_tree(oConfig['file_section']['hors_quota_folder_name_output'], ToolsBox.get_months())

"""Begin BuildXML class"""
class BuildXML:
    # Description: Constructor
    #
    # @param sMonthYear: Month and Year string
    # @param sActionType: Action type, U for Update, C for create
    # @param sRootElementName: Root element name
    # @param sURL: URL namespace
    # @param sInputFile: Input source file name
    # @param sOutputFile: Output XML file name
    # @param sXMLEncoding: (Optional) XML Encoding, default: UTF-8
    # @param iStartRow: (Optional) Skip header line, default: 3
    # @return: void
    def __init__(self, sMonthYear, sActionType, sRootElementName = 'ns1:BasicAttribute', sURL = '', sInputFile = '', sOutputFile = '', sXMLEncoding = 'UTF-8', iStartRow = 3):
        self.sMonthYear = sMonthYear
        self.sActionType = sActionType
        self.sRootElementName = sRootElementName
        self.sURL = sURL
        self.sInputFile = sInputFile
        self.sOutputFile = sOutputFile
        self.sXMLEncoding = sXMLEncoding
        self.iStartRow = iStartRow

    # Description: Build XML report for EU_NQ
    #              Call sample: BuildXML.build_hors_quota_report(self)
    # @param self: Constructor parameters
    # @return: mixed (object/int)
    def build_hors_quota_report(self):
        try:
            oDocument = xml.dom.minidom.Document() # Set XML object
            rootNode = oDocument.createElementNS(self.sURL, self.sRootElementName) # Create root node
            rootNode.setAttribute("xmlns:ns1", oConfig['xml_section']['default_namespace_ns1'])
            rootNode.setAttribute("xmlns:ns2", oConfig['xml_section']['default_namespace_ns2'])
            oDocument.appendChild(rootNode)

            ToolsBox.set_xml_node(oDocument, 'ns1:MessageID', rootNode, ToolsBox.set_uuid()) # Make a UUID based on the host ID and current time
            ToolsBox.set_xml_node(oDocument, 'ns1:Creation', rootNode, ToolsBox.get_current_datetime('%Y-%m-%dT%H:%M:%S')) 

            ExchangeDocumentInfo = ToolsBox.set_xml_node(oDocument, 'ns1:ExchangeDocumentInfo', rootNode)
            ToolsBox.set_xml_node(oDocument, 'ns1:ActionType', ExchangeDocumentInfo, self.sActionType) # Set "C" which is [Create] or "U" is [Update]
            ToolsBox.set_xml_node(oDocument, 'ns1:EditorType', ExchangeDocumentInfo, oConfig['xml_section']['hors_quota_report_editor_type']) # Set the editor of the message
            ToolsBox.set_xml_node(oDocument, 'ns1:ActionReason', ExchangeDocumentInfo, oConfig['xml_section']['hors_quota_report_action_reason']) # Set reason why the action
            ToolsBox.set_xml_node(oDocument, 'ns1:ReferencedMessage', ExchangeDocumentInfo) # Empty node 

            SpecifiedAggregatedCatchReport = ToolsBox.set_xml_node(oDocument, 'ns1:SpecifiedAggregatedCatchReport', ExchangeDocumentInfo)
            ToolsBox.set_xml_node(oDocument, 'ns1:ReporterIdentification', SpecifiedAggregatedCatchReport, oConfig['xml_section']['hors_quota_report_reporter_identification'])

            ReportingPeriod = ToolsBox.set_xml_node(oDocument, 'ns1:ReportingPeriod', SpecifiedAggregatedCatchReport)
            ToolsBox.set_xml_node(oDocument, 'ns2:StartDate', ReportingPeriod, ToolsBox.get_first_day_of_month(ToolsBox.get_current_datetime_by_string(self.sMonthYear[0:2], self.sMonthYear[2:6]), '-'))
            ToolsBox.set_xml_node(oDocument, 'ns2:EndDate', ReportingPeriod, ToolsBox.get_end_day_of_month(ToolsBox.get_current_datetime_by_string(self.sMonthYear[0:2], self.sMonthYear[2:6]), '-'))

            ReportSenderFLUX_ACDR_Party = ToolsBox.set_xml_node(oDocument, 'ns1:ReportSenderFLUX_ACDR_Party', SpecifiedAggregatedCatchReport)
            ToolsBox.set_xml_node(oDocument, 'ns2:Identification', ReportSenderFLUX_ACDR_Party, oConfig['xml_section']['hors_quota_report_identification'])
            ToolsBox.set_xml_node(oDocument, 'ns2:Name', ReportSenderFLUX_ACDR_Party, oConfig['xml_section']['hors_quota_report_name'])                                   

            # Pre-check
            ToolsBox.check_empty_file(self.sInputFile)

            # Open source file
            oFileHandle = open(self.sInputFile, 'rb')
            oReader = csv.reader(oFileHandle, delimiter = '\t') # Reader pointer

            # Skip header lines if needed
            if self.iStartRow >=0:
                for i in xrange(self.iStartRow):
                    i += 1 # Do nothing, but use the variable, because we want to avoid "Warning"
                    oReader.next() # Skip line N time regarding self.iStartRow

            # Set dictionary
            aDictionary = {}
            i = 0
            for row in oReader:
                i += 1 # Hack, add [i] increment because sorted() delete duplicate key when sorting
                aDictionary[row[6] + '_' + row[7] + '_' + row[2] + '_' + str(i)] = row # key, value[row]

            # Start iterate sorted dictionary
            sTempSpecifiedReportedArea = ''

            # Start main loop on sorted dictionary
            for key in sorted(aDictionary.iterkeys()):
                # Dictionary row
                aRow = aDictionary[key] 

                # SpecifiedReportedArea part
                sFAOArea = aRow[6]
                sSovereigntyWater = aRow[7] 

                # QuantifiesReportedCatch part
                iQuantity = str(float(aRow[10].replace(",", "."))/1000) 

                # FAOArea must exist for declaration
                # Quantity must exist, avoid 0 or empty
                # SovereigntyWater must exist for declaration
                if sSovereigntyWater <> '' and sFAOArea <> '' and iQuantity <> '0.0' and iQuantity <> '0' and iQuantity <> '':
                    # Split key
                    sPattern = key.split('_')

                    # Pre-check, we stop if null element in the key has been found
                    ToolsBox.check_empty_element_in_array(sPattern, 4) # 4 is the number of element in the key

                    # Get needed variables
                    sSpecifiedReportedArea = sPattern[0] + sPattern[1] + sPattern[2]                                      

                    # Start business logic
                    if sSpecifiedReportedArea <> sTempSpecifiedReportedArea:
                        print 'SpecifiedReportedArea : %s' % sSpecifiedReportedArea
                        # SpecifiedReportedArea part
                        SpecifiedReportedArea = ToolsBox.set_xml_node(oDocument, 'ns1:SpecifiedReportedArea', SpecifiedAggregatedCatchReport)
                        ToolsBox.set_xml_node(oDocument, 'ns2:FAOArea', SpecifiedReportedArea, sPattern[0])
                        ToolsBox.set_xml_node(oDocument, 'ns2:SovereigntyWater', SpecifiedReportedArea, sPattern[1])
                        ToolsBox.set_xml_node(oDocument, 'ns2:LandingPlace', SpecifiedReportedArea, sPattern[2])

                        # QuantifiesReportedCatch part
                        print '\tQuantifiesReportedCatch : %s' % aRow[3] + iQuantity + aRow[2]
                        QuantifiesReportedCatch = ToolsBox.set_xml_node(oDocument, 'ns2:QuantifiesReportedCatch', SpecifiedReportedArea)
                        ToolsBox.set_xml_node(oDocument, 'ns2:Species', QuantifiesReportedCatch, aRow[3])
                        ToolsBox.set_xml_node(oDocument, 'ns2:Quantity', QuantifiesReportedCatch, iQuantity , 'unitCode', oConfig['xml_section']['hors_quota_report_unit_code'])
                        ToolsBox.set_xml_node(oDocument, 'ns2:LandingIndicator', QuantifiesReportedCatch, aRow[2])
                    else:
                        # QuantifiesReportedCatch part
                        print '\tQuantifiesReportedCatch : %s' % aRow[3] + iQuantity + aRow[2]
                        QuantifiesReportedCatch = ToolsBox.set_xml_node(oDocument, 'ns2:QuantifiesReportedCatch', SpecifiedReportedArea)
                        ToolsBox.set_xml_node(oDocument, 'ns2:Species', QuantifiesReportedCatch, aRow[3])
                        ToolsBox.set_xml_node(oDocument, 'ns2:Quantity', QuantifiesReportedCatch, iQuantity , 'unitCode', oConfig['xml_section']['hors_quota_report_unit_code'])
                        ToolsBox.set_xml_node(oDocument, 'ns2:LandingIndicator', QuantifiesReportedCatch, aRow[2])
                    sTempSpecifiedReportedArea = sSpecifiedReportedArea
                else:
                    # If empty value has been detected
                    oDocument = None # Reset oDocument
                    # Check valid data and log
                    if sFAOArea == '':
                        print 'INFO: %s' % key + ' ' + oConfig['label_section']['stat_label_invalid_zone']
                    if sSovereigntyWater == '':
                        print 'INFO: %s' % key + ' ' + oConfig['label_section']['stat_label_invalid_sovereignty_water']
                    if iQuantity == '0.0' or iQuantity == '0' or iQuantity == '':
                        print 'INFO: %s' % key + ' ' + oConfig['label_section']['stat_label_invalid_quantity']
                    oFileHandle.close() # Close file handle
                    break
            # So far it's OK, we close file handle
            oFileHandle.close()
            return oDocument
        except Exception:
            sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
            sys.exit()
            return 1   

    # Description: Write XML flow to XML file
    #
    # @param self: Constructor parameters
    # @param oDocument: XML flow document
    # @return: int
    def write_xml_report(self, oDocument):
        try:
            oFileOutput = open(self.sOutputFile, 'w')
            oFileOutput.write(oDocument.toprettyxml(encoding=self.sXMLEncoding))
            oFileOutput.close()
            return 0
        except Exception:
            sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
            sys.exit()
            return 1
"""End BuildXML class"""

# Description: Exec module
#
# @param sMonthYear: MonthYear string
# @param sActionType: U if needed, C is the default
# @return: int
def exec_module(sMonthYear, sActionType = 'C'):
    try:
        # Pre-process, scan input folder, if file pattern exist
        sRealFileName = ToolsBox.check_input_file(
            os.getcwd() + "\\" + oConfig['file_section']['hors_quota_folder_name_input'] + "\\" + sMonthYear[0:2],
            oConfig['file_section']['hors_quota_file_name_input']
        )        

        # Assign input/output files
        sInputFile = os.getcwd() + "\\" + oConfig['file_section']['hors_quota_folder_name_input'] + "\\" + sMonthYear[0:2] + "\\" + sRealFileName
        sOutputFile = os.getcwd() + "\\" + oConfig['file_section']['hors_quota_folder_name_output'] + "\\" + sMonthYear[0:2] + "\\" + oConfig['file_section']['hors_quota_file_name_output'].replace("%pattern%", ToolsBox.get_current_datetime('%Y%m%d'))

        print 'INFO: %s' % 'Starting : oBuildXMLHORSQUOTAReport = BuildXML'
        print 'INFO: %s' % 'Param : hors_quota_root_node_name : ' + oConfig['xml_section']['hors_quota_root_node_name']
        print 'INFO: %s' % 'Param : hors_quota_file_name_input : ' + sInputFile
        print 'INFO: %s' % 'Param : hors_quota_file_name_output : ' + sOutputFile
        print 'INFO: %s' % 'Param : default_encoding : ' + oConfig['xml_section']['default_encoding']
        print 'INFO: %s' % 'Param : default_header_line_number : ' + oConfig['xml_section']['default_header_line_number']

        # Build XML object
        #
        # @param : Month and Year string
        # @param : Action type, C / U
        # @param : Root node name
        # @param : Namespace name
        # @param : Input file for reporting
        # @param : Output file for reporting
        # @param : XML encoding
        # @param : Header line (will skip N lines before processing data)
        # @return: object
        oBuildXMLHORSQUOTAReport = BuildXML(
            sMonthYear,
            sActionType,
            oConfig['xml_section']['hors_quota_root_node_name'],
            oConfig['xml_section']['default_namespace_url'],
            sInputFile,
            sOutputFile,
            oConfig['xml_section']['default_encoding'],
            int(oConfig['xml_section']['default_header_line_number'])
        )

        # If object has been created successfully
        if isinstance(oBuildXMLHORSQUOTAReport, BuildXML):
            print 'INFO: %s' % 'Starting : oBuildXMLHORSQUOTAReport.build_hors_quota_report()'
            # Call hors quota report
            oDocument = oBuildXMLHORSQUOTAReport.build_hors_quota_report()
            print 'INFO: %s' % 'Starting : oBuildXMLHORSQUOTAReport.write_xml_report(oDocument)'
            if oDocument:
                # Save hors quota report to XML file
                oBuildXMLHORSQUOTAReport.write_xml_report(oDocument)
                print 'INFO: %s' % 'Starting post-process : ToolsBox.check_xml_format(...)'
                # Post validation, check well formed XML
                if ToolsBox.check_xml_format(sOutputFile) is False:
                    sys.exit()
                print 'INFO: %s' % 'XML file report has been successfully created : ' + sOutputFile
                print 'INFO: %s' % 'Exit code 0'
            else:
                print 'ERROR: %s' % 'An empty value has been detected from source file : ' + sInputFile
                print 'INFO: %s' % 'Exit code 1'
                raise Exception('An empty value has been detected from source file : ' + sInputFile)
        return 0
    except Exception:
        sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
        sys.exit()
        return 1        

# Description: Main entry point
#
# @return: int
def main():
    try:
        # Display menu for user
        while 1:
            sys.stdout.write(
                textwrap.dedent("""\
                Valid commands are:
                -------------------------------------------
                [E] to exit
                [MMYYYY] (Sample: 012013, will do month 01, year 2013 as "Create" XML message)
                [MMYYYY U] (Sample: 012014 U, will do month 01, year 2014 as "Update" XML message)
                """)
            )

            # Read input
            oCommandLine = raw_input()
            if oCommandLine == 'E':
                break
            elif len(oCommandLine.strip()) == 6 and oCommandLine[0:6].isdigit():
                # Check month format
                if any(oCommandLine[0:2] in aList for aList in ToolsBox.get_months()):
                    exec_module(oCommandLine[0:6])
                    return 0
                else:
                    print 'Wrong MONTH format : ' + oCommandLine[0:2]
                    return 1
            elif oCommandLine[7].lower() == 'u' and oCommandLine[0:6].isdigit():
                # Check month format
                if any(oCommandLine[0:2] in aList for aList in ToolsBox.get_months()):
                    exec_module(oCommandLine[0:6], 'U')
                    return 0
                else:
                    print 'Wrong MONTH format : ' + oCommandLine[0:2]
                    return 1
            else:
                print 'Unknown command or bad command'
                return 0
    except Exception:
        sys.stderr.write('ERROR: %s\n' % traceback.format_exc())
        sys.exit()
        return 1 

if __name__ == '__main__':
    sys.exit(main())
Voici un exemple de fichier en sortie produit pour cet exemple métier:
<rsm:BasicAttribute xmlns:rsm="urn:xeu:ec:fisheries:flux-bl:RegionalReport:1:1" xmlns:flux_ram="urn:xeu:ec:fisheries:flux-bl:AggregateBusinessInformationEntity:1:1">
   <rsm:MessageID>${#TestCase#uuid}</rsm:MessageID>
   <rsm:Creation>2013-12-31T12:00:00</rsm:Creation>
   <rsm:ExchangeDocumentInfo>
	  <rsm:ActionType>C</rsm:ActionType>
	  <rsm:EditorType>free text: "sample creator"</rsm:EditorType>
	  <rsm:ActionReason>free text: "to create examples"</rsm:ActionReason>
	  <rsm:ReferencedMessage/>
	  <rsm:SpecifiedAggregatedCatchReport>
		 <rsm:ReporterIdentification>${#TestSuite#FluxFR}</rsm:ReporterIdentification>
		 <rsm:ReportingPeriod>
			<flux_ram:StartDate>${#TestCase#startDate}</flux_ram:StartDate>
			<flux_ram:EndDate>${#TestCase#endDate}</flux_ram:EndDate>
		 </rsm:ReportingPeriod>
		 <rsm:ReportSenderFLUX_ACDR_Party>
			<flux_ram:Identification>${#TestSuite#fidesUser}</flux_ram:Identification>
			<flux_ram:Name>free text: "name of report creator"</flux_ram:Name>
		 </rsm:ReportSenderFLUX_ACDR_Party>
		 <rsm:SpecifiedRegional>
			<rsm:RegionalArea>3B23.</rsm:RegionalArea>
			<rsm:RegionalSpecies>SAL</rsm:RegionalSpecies>
			<rsm:SpecifiedReportedArea>
			   <flux_ram:FAOArea>27.3.b.23</flux_ram:FAOArea>
			   <flux_ram:SovereigntyWater>XEU</flux_ram:SovereigntyWater>
			   <flux_ram:LandingPlace>LVA</flux_ram:LandingPlace>
			   <flux_ram:QuantifiesReportedCatch>
				  <flux_ram:Species>SAL</flux_ram:Species>
				  <flux_ram:Quantity unitCode="C62">15</flux_ram:Quantity>
			   </flux_ram:QuantifiesReportedCatch>
			   <flux_ram:QuantifiesReportedCatch>
				  <flux_ram:Species>SAL</flux_ram:Species>
				  <flux_ram:Quantity unitCode="C62">5</flux_ram:Quantity>
			   </flux_ram:QuantifiesReportedCatch>
			</rsm:SpecifiedReportedArea>
		 </rsm:SpecifiedRegional>
	  </rsm:SpecifiedAggregatedCatchReport>
   </rsm:ExchangeDocumentInfo>
</rsm:BasicAttribute>

Voici à disposition le module complet en téléchargement, vous pouvez le modifier en fonction de vos besoins:
make_acdr_eu_nq_report

Laisser un commentaire


9 × 7 =